当前位置: 代码网 > it编程>前端脚本>Python > 10个提升Python模型训练效率的高级脚本

10个提升Python模型训练效率的高级脚本

2025年07月20日 Python 我要评论
引言在机器学习领域,随着数据集规模的不断扩大和模型复杂度的增加,训练时间变得越来越长。python的多线程技术为我们提供了一种有效利用现代多核cpu资源的方法,可以显著加速数据预处理、特征工程和模型训

引言

在机器学习领域,随着数据集规模的不断扩大和模型复杂度的增加,训练时间变得越来越长。python的多线程技术为我们提供了一种有效利用现代多核cpu资源的方法,可以显著加速数据预处理、特征工程和模型训练过程。本文将介绍10个高级python多线程脚本,帮助你在机器学习项目中实现性能飞跃。

1. 多线程数据预处理流水线

import concurrent.futures
import pandas as pd
from sklearn.preprocessing import standardscaler

def preprocess_chunk(data_chunk):
    # 数据清洗
    data_chunk = data_chunk.dropna()
    # 特征缩放
    scaler = standardscaler()
    scaled_features = scaler.fit_transform(data_chunk.select_dtypes(include=['float64']))
    data_chunk[data_chunk.select_dtypes(include=['float64']).columns] = scaled_features
    return data_chunk

def parallel_preprocessing(data, chunk_size=10000, workers=4):
    chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
    with concurrent.futures.threadpoolexecutor(max_workers=workers) as executor:
        processed_chunks = list(executor.map(preprocess_chunk, chunks))
    return pd.concat(processed_chunks)

# 使用示例
# data = pd.read_csv('large_dataset.csv')
# processed_data = parallel_preprocessing(data)

应用场景:大规模数据集的特征缩放、缺失值处理等预处理操作。

2. 并行特征工程生成

from concurrent.futures import threadpoolexecutor
import numpy as np
import pandas as pd

def generate_feature(args):
    col1, col2, operation = args
    if operation == 'add':
        return col1 + col2
    elif operation == 'mul':
        return col1 * col2
    elif operation == 'sub':
        return col1 - col2
    elif operation == 'div':
        return np.where(col2 != 0, col1 / col2, 0)

def parallel_feature_engineering(data, feature_configs, workers=4):
    features = pd.dataframe()
    with threadpoolexecutor(max_workers=workers) as executor:
        results = executor.map(generate_feature, 
                             [(data[config['col1']], data[config['col2']], config['op']) 
                              for config in feature_configs])
        for config, result in zip(feature_configs, results):
            features[config['name']] = result
    return pd.concat([data, features], axis=1)

# 使用示例
# configs = [
#     {'name': 'feat1', 'col1': 'age', 'col2': 'income', 'op': 'mul'},
#     {'name': 'feat2', 'col1': 'height', 'col2': 'weight', 'op': 'div'}
# ]
# enhanced_data = parallel_feature_engineering(data, configs)

应用场景:需要生成大量交互特征或派生特征时。

3. 多线程超参数搜索

from sklearn.model_selection import parametergrid
from sklearn.ensemble import randomforestclassifier
from concurrent.futures import threadpoolexecutor
from sklearn.metrics import accuracy_score

def train_model(params, x_train, y_train, x_val, y_val):
    model = randomforestclassifier(**params)
    model.fit(x_train, y_train)
    preds = model.predict(x_val)
    return accuracy_score(y_val, preds), params

def parallel_param_search(x_train, y_train, x_val, y_val, param_grid, workers=4):
    grid = parametergrid(param_grid)
    best_score = -1
    best_params = none
    
    with threadpoolexecutor(max_workers=workers) as executor:
        futures = []
        for params in grid:
            futures.append(executor.submit(
                train_model, params, x_train, y_train, x_val, y_val))
        
        for future in concurrent.futures.as_completed(futures):
            score, params = future.result()
            if score > best_score:
                best_score = score
                best_params = params
                
    return best_params, best_score

# 使用示例
# param_grid = {
#     'n_estimators': [50, 100, 200],
#     'max_depth': [none, 10, 20],
#     'min_samples_split': [2, 5, 10]
# }
# best_params, best_score = parallel_param_search(x_train, y_train, x_val, y_val, param_grid)

应用场景:加速随机森林、梯度提升树等模型的超参数调优过程。

4. 并行模型集成

from sklearn.base import clone
from concurrent.futures import threadpoolexecutor
import numpy as np

class parallelensemble:
    def __init__(self, base_estimator, n_estimators=10, workers=4):
        self.base_estimator = base_estimator
        self.n_estimators = n_estimators
        self.workers = workers
        self.estimators_ = []
    
    def fit(self, x, y):
        self.estimators_ = []
        with threadpoolexecutor(max_workers=self.workers) as executor:
            futures = []
            for _ in range(self.n_estimators):
                estimator = clone(self.base_estimator)
                futures.append(executor.submit(estimator.fit, x, y))
            
            for future in concurrent.futures.as_completed(futures):
                self.estimators_.append(future.result())
        return self
    
    def predict_proba(self, x):
        probas = []
        with threadpoolexecutor(max_workers=self.workers) as executor:
            futures = [executor.submit(estimator.predict_proba, x) 
                      for estimator in self.estimators_]
            for future in concurrent.futures.as_completed(futures):
                probas.append(future.result())
        return np.mean(probas, axis=0)
    
    def predict(self, x):
        proba = self.predict_proba(x)
        return np.argmax(proba, axis=1)

# 使用示例
# from sklearn.linear_model import logisticregression
# ensemble = parallelensemble(logisticregression(), n_estimators=10, workers=4)
# ensemble.fit(x_train, y_train)
# predictions = ensembl

应用场景:创建并行化的bagging集成模型,适用于任何基础估计器。

5. 多线程交叉验证评估

from sklearn.model_selection import kfold
from concurrent.futures import threadpoolexecutor
import numpy as np
from sklearn.metrics import get_scorer

def cross_val_score_parallel(estimator, x, y, cv=5, scoring='accuracy', workers=4):
    kf = kfold(n_splits=cv)
    scorer = get_scorer(scoring)
    scores = []
    
    def train_eval(train_idx, test_idx):
        x_train, x_test = x[train_idx], x[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        estimator.fit(x_train, y_train)
        return scorer(estimator, x_test, y_test)
    
    with threadpoolexecutor(max_workers=workers) as executor:
        futures = []
        for train_idx, test_idx in kf.split(x):
            futures.append(executor.submit(train_eval, train_idx, test_idx))
        
        for future in concurrent.futures.as_completed(futures):
            scores.append(future.result())
    
    return np.array(scores)

# 使用示例
# from sklearn.ensemble import gradientboostingclassifier
# model = gradientboostingclassifier()
# scores = cross_val_score_parallel(model, x, y, cv=5, workers=4)
# print(f"平均准确率: {scores.mean():.4f}")

应用场景:加速模型的交叉验证过程,特别适用于计算密集型模型。

6. 并行时间序列特征提取

import numpy as np
import pandas as pd
from concurrent.futures import threadpoolexecutor
from tsfresh import extract_features

def parallel_ts_feature_extraction(ts_data, column_id='id', column_sort='time', workers=4):
    ids = ts_data[column_id].unique()
    chunk_size = len(ids) // workers
    id_chunks = [ids[i:i + chunk_size] for i in range(0, len(ids), chunk_size)]
    
    def process_chunk(chunk_ids):
        chunk_data = ts_data[ts_data[column_id].isin(chunk_ids)]
        return extract_features(chunk_data, column_id=column_id, column_sort=column_sort)
    
    features = []
    with threadpoolexecutor(max_workers=workers) as executor:
        futures = [executor.submit(process_chunk, chunk) for chunk in id_chunks]
        for future in concurrent.futures.as_completed(futures):
            features.append(future.result())
    
    return pd.concat(features)

# 使用示例
# features = parallel_ts_feature_extraction(time_series_data, workers=4)

应用场景:处理大规模时间序列数据集的特征提取。

7. 多线程模型预测服务

from concurrent.futures import threadpoolexecutor
import numpy as np
from queue import queue
from threading import thread

class predictionserver:
    def __init__(self, model, max_workers=4, batch_size=32):
        self.model = model
        self.max_workers = max_workers
        self.batch_size = batch_size
        self.input_queue = queue()
        self.output_queue = queue()
        self.workers = []
        
    def _worker(self):
        while true:
            batch = self.input_queue.get()
            if batch is none:
                break
            ids, data = batch
            preds = self.model.predict(data)
            self.output_queue.put((ids, preds))
            self.input_queue.task_done()
    
    def start(self):
        self.workers = []
        for _ in range(self.max_workers):
            t = thread(target=self._worker)
            t.start()
            self.workers.append(t)
    
    def stop(self):
        for _ in range(self.max_workers):
            self.input_queue.put(none)
        for worker in self.workers:
            worker.join()
    
    def predict(self, x):
        self.start()
        num_samples = len(x)
        predictions = [none] * num_samples
        
        # 分批提交预测任务
        for i in range(0, num_samples, self.batch_size):
            batch = (list(range(i, min(i+self.batch_size, num_samples))),
                     x[i:i+self.batch_size])
            self.input_queue.put(batch)
        
        # 收集结果
        results_received = 0
        while results_received < num_samples:
            ids, preds = self.output_queue.get()
            for id_, pred in zip(ids, preds):
                predictions[id_] = pred
            results_received += len(ids)
            self.output_queue.task_done()
        
        self.stop()
        return np.array(predictions)

# 使用示例
# server = predictionserver(trained_model, max_workers=4)
# predictions = server.predict(x_test)

应用场景:构建高性能的模型预测服务,适用于在线或批量预测场景。

8. 并行特征选择

from sklearn.feature_selection import selectkbest, mutual_info_classif
from concurrent.futures import threadpoolexecutor
import numpy as np

def parallel_feature_selection(x, y, k_features=10, workers=4):
    n_features = x.shape[1]
    features_per_worker = n_features // workers
    selected_features = []
    
    def select_features(feature_indices):
        selector = selectkbest(mutual_info_classif, k=min(k_features, len(feature_indices)))
        x_subset = x[:, feature_indices]
        selector.fit(x_subset, y)
        return [feature_indices[i] for i in selector.get_support(indices=true)]
    
    with threadpoolexecutor(max_workers=workers) as executor:
        futures = []
        for i in range(workers):
            start = i * features_per_worker
            end = (i+1)*features_per_worker if i != workers-1 else n_features
            feature_indices = list(range(start, end))
            futures.append(executor.submit(select_features, feature_indices))
        
        for future in concurrent.futures.as_completed(futures):
            selected_features.extend(future.result())
    
    # 二次筛选
    if len(selected_features) > k_features:
        selector = selectkbest(mutual_info_classif, k=k_features)
        selector.fit(x[:, selected_features], y)
        selected_features = [selected_features[i] for i in selector.get_support(indices=true)]
    
    return selected_features

# 使用示例
# selected = parallel_feature_selection(x_train, y_train, k_features=20, workers=4)
# x_train_selected = x_train[:, selected]
# x_test_selected = x_test[:, selected]

应用场景:高维数据集的并行特征选择。

9. 多线程模型持久化

import concurrent.futures
import pickle
import gzip
from pathlib import path

def save_model(model, filepath, compress=true):
    if compress:
        with gzip.open(filepath, 'wb') as f:
            pickle.dump(model, f)
    else:
        with open(filepath, 'wb') as f:
            pickle.dump(model, f)
    return filepath

def parallel_save_models(models_info, workers=4):
    path("saved_models").mkdir(exist_ok=true)
    with concurrent.futures.threadpoolexecutor(max_workers=workers) as executor:
        futures = []
        for model_name, model in models_info.items():
            filepath = f"saved_models/{model_name}.pkl.gz"
            futures.append(executor.submit(save_model, model, filepath))
        
        for future in concurrent.futures.as_completed(futures):
            print(f"模型已保存到: {future.result()}")

# 使用示例
# models = {
#     'random_forest': rf_model,
#     'gradient_boosting': gb_model,
#     'svm': svm_model
# }
# parallel_save_models(models, workers=4)

应用场景:同时保存多个训练好的模型,节省i/o时间。

10. 多线程数据增强

import concurrent.futures
import numpy as np
from albumentations import compose, horizontalflip, rotate, randombrightnesscontrast

def augment_image(image, augmentations):
    return augmentations(image=image)['image']

def parallel_data_augmentation(images, labels, augmentations, multiplier=4, workers=4):
    augmented_images = []
    augmented_labels = []
    
    # 创建增强管道
    aug_pipeline = compose([
        horizontalflip(p=0.5),
        rotate(limit=30, p=0.5),
        randombrightnesscontrast(p=0.2),
    ])
    
    # 准备任务参数
    tasks = []
    for _ in range(multiplier):
        for img, lbl in zip(images, labels):
            tasks.append((img, lbl, aug_pipeline))
    
    # 并行执行增强
    with concurrent.futures.threadpoolexecutor(max_workers=workers) as executor:
        futures = [executor.submit(augment_image, *task[:2], task[2]) for task in tasks]
        for future, task in zip(futures, tasks):
            augmented_images.append(future.result())
            augmented_labels.append(task[1])
    
    # 合并原始数据
    augmented_images = np.concatenate([images, augmented_images])
    augmented_labels = np.concatenate([labels, augmented_labels])
    
    return augmented_images, augmented_labels

# 使用示例
# x_train_aug, y_train_aug = parallel_data_augmentation(x_train, y_train, multiplier=3, workers=4)

应用场景:图像数据的并行增强,特别适用于深度学习中的小数据集。

总结

本文介绍了10个python多线程在机器学习中的高级应用脚本,涵盖了从数据预处理到模型训练、评估和部署的全流程。通过合理利用多线程技术,可以显著提升机器学习工作流的效率,特别是在处理大规模数据或计算密集型任务时。

到此这篇关于10个提升python模型训练效率的高级脚本的文章就介绍到这了,更多相关python模型训练内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com