引言
在机器学习领域,随着数据集规模的不断扩大和模型复杂度的增加,训练时间变得越来越长。python的多线程技术为我们提供了一种有效利用现代多核cpu资源的方法,可以显著加速数据预处理、特征工程和模型训练过程。本文将介绍10个高级python多线程脚本,帮助你在机器学习项目中实现性能飞跃。
1. 多线程数据预处理流水线
import concurrent.futures import pandas as pd from sklearn.preprocessing import standardscaler def preprocess_chunk(data_chunk): # 数据清洗 data_chunk = data_chunk.dropna() # 特征缩放 scaler = standardscaler() scaled_features = scaler.fit_transform(data_chunk.select_dtypes(include=['float64'])) data_chunk[data_chunk.select_dtypes(include=['float64']).columns] = scaled_features return data_chunk def parallel_preprocessing(data, chunk_size=10000, workers=4): chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] with concurrent.futures.threadpoolexecutor(max_workers=workers) as executor: processed_chunks = list(executor.map(preprocess_chunk, chunks)) return pd.concat(processed_chunks) # 使用示例 # data = pd.read_csv('large_dataset.csv') # processed_data = parallel_preprocessing(data)
应用场景:大规模数据集的特征缩放、缺失值处理等预处理操作。
2. 并行特征工程生成
from concurrent.futures import threadpoolexecutor import numpy as np import pandas as pd def generate_feature(args): col1, col2, operation = args if operation == 'add': return col1 + col2 elif operation == 'mul': return col1 * col2 elif operation == 'sub': return col1 - col2 elif operation == 'div': return np.where(col2 != 0, col1 / col2, 0) def parallel_feature_engineering(data, feature_configs, workers=4): features = pd.dataframe() with threadpoolexecutor(max_workers=workers) as executor: results = executor.map(generate_feature, [(data[config['col1']], data[config['col2']], config['op']) for config in feature_configs]) for config, result in zip(feature_configs, results): features[config['name']] = result return pd.concat([data, features], axis=1) # 使用示例 # configs = [ # {'name': 'feat1', 'col1': 'age', 'col2': 'income', 'op': 'mul'}, # {'name': 'feat2', 'col1': 'height', 'col2': 'weight', 'op': 'div'} # ] # enhanced_data = parallel_feature_engineering(data, configs)
应用场景:需要生成大量交互特征或派生特征时。
3. 多线程超参数搜索
from sklearn.model_selection import parametergrid from sklearn.ensemble import randomforestclassifier from concurrent.futures import threadpoolexecutor from sklearn.metrics import accuracy_score def train_model(params, x_train, y_train, x_val, y_val): model = randomforestclassifier(**params) model.fit(x_train, y_train) preds = model.predict(x_val) return accuracy_score(y_val, preds), params def parallel_param_search(x_train, y_train, x_val, y_val, param_grid, workers=4): grid = parametergrid(param_grid) best_score = -1 best_params = none with threadpoolexecutor(max_workers=workers) as executor: futures = [] for params in grid: futures.append(executor.submit( train_model, params, x_train, y_train, x_val, y_val)) for future in concurrent.futures.as_completed(futures): score, params = future.result() if score > best_score: best_score = score best_params = params return best_params, best_score # 使用示例 # param_grid = { # 'n_estimators': [50, 100, 200], # 'max_depth': [none, 10, 20], # 'min_samples_split': [2, 5, 10] # } # best_params, best_score = parallel_param_search(x_train, y_train, x_val, y_val, param_grid)
应用场景:加速随机森林、梯度提升树等模型的超参数调优过程。
4. 并行模型集成
from sklearn.base import clone from concurrent.futures import threadpoolexecutor import numpy as np class parallelensemble: def __init__(self, base_estimator, n_estimators=10, workers=4): self.base_estimator = base_estimator self.n_estimators = n_estimators self.workers = workers self.estimators_ = [] def fit(self, x, y): self.estimators_ = [] with threadpoolexecutor(max_workers=self.workers) as executor: futures = [] for _ in range(self.n_estimators): estimator = clone(self.base_estimator) futures.append(executor.submit(estimator.fit, x, y)) for future in concurrent.futures.as_completed(futures): self.estimators_.append(future.result()) return self def predict_proba(self, x): probas = [] with threadpoolexecutor(max_workers=self.workers) as executor: futures = [executor.submit(estimator.predict_proba, x) for estimator in self.estimators_] for future in concurrent.futures.as_completed(futures): probas.append(future.result()) return np.mean(probas, axis=0) def predict(self, x): proba = self.predict_proba(x) return np.argmax(proba, axis=1) # 使用示例 # from sklearn.linear_model import logisticregression # ensemble = parallelensemble(logisticregression(), n_estimators=10, workers=4) # ensemble.fit(x_train, y_train) # predictions = ensembl
应用场景:创建并行化的bagging集成模型,适用于任何基础估计器。
5. 多线程交叉验证评估
from sklearn.model_selection import kfold from concurrent.futures import threadpoolexecutor import numpy as np from sklearn.metrics import get_scorer def cross_val_score_parallel(estimator, x, y, cv=5, scoring='accuracy', workers=4): kf = kfold(n_splits=cv) scorer = get_scorer(scoring) scores = [] def train_eval(train_idx, test_idx): x_train, x_test = x[train_idx], x[test_idx] y_train, y_test = y[train_idx], y[test_idx] estimator.fit(x_train, y_train) return scorer(estimator, x_test, y_test) with threadpoolexecutor(max_workers=workers) as executor: futures = [] for train_idx, test_idx in kf.split(x): futures.append(executor.submit(train_eval, train_idx, test_idx)) for future in concurrent.futures.as_completed(futures): scores.append(future.result()) return np.array(scores) # 使用示例 # from sklearn.ensemble import gradientboostingclassifier # model = gradientboostingclassifier() # scores = cross_val_score_parallel(model, x, y, cv=5, workers=4) # print(f"平均准确率: {scores.mean():.4f}")
应用场景:加速模型的交叉验证过程,特别适用于计算密集型模型。
6. 并行时间序列特征提取
import numpy as np import pandas as pd from concurrent.futures import threadpoolexecutor from tsfresh import extract_features def parallel_ts_feature_extraction(ts_data, column_id='id', column_sort='time', workers=4): ids = ts_data[column_id].unique() chunk_size = len(ids) // workers id_chunks = [ids[i:i + chunk_size] for i in range(0, len(ids), chunk_size)] def process_chunk(chunk_ids): chunk_data = ts_data[ts_data[column_id].isin(chunk_ids)] return extract_features(chunk_data, column_id=column_id, column_sort=column_sort) features = [] with threadpoolexecutor(max_workers=workers) as executor: futures = [executor.submit(process_chunk, chunk) for chunk in id_chunks] for future in concurrent.futures.as_completed(futures): features.append(future.result()) return pd.concat(features) # 使用示例 # features = parallel_ts_feature_extraction(time_series_data, workers=4)
应用场景:处理大规模时间序列数据集的特征提取。
7. 多线程模型预测服务
from concurrent.futures import threadpoolexecutor import numpy as np from queue import queue from threading import thread class predictionserver: def __init__(self, model, max_workers=4, batch_size=32): self.model = model self.max_workers = max_workers self.batch_size = batch_size self.input_queue = queue() self.output_queue = queue() self.workers = [] def _worker(self): while true: batch = self.input_queue.get() if batch is none: break ids, data = batch preds = self.model.predict(data) self.output_queue.put((ids, preds)) self.input_queue.task_done() def start(self): self.workers = [] for _ in range(self.max_workers): t = thread(target=self._worker) t.start() self.workers.append(t) def stop(self): for _ in range(self.max_workers): self.input_queue.put(none) for worker in self.workers: worker.join() def predict(self, x): self.start() num_samples = len(x) predictions = [none] * num_samples # 分批提交预测任务 for i in range(0, num_samples, self.batch_size): batch = (list(range(i, min(i+self.batch_size, num_samples))), x[i:i+self.batch_size]) self.input_queue.put(batch) # 收集结果 results_received = 0 while results_received < num_samples: ids, preds = self.output_queue.get() for id_, pred in zip(ids, preds): predictions[id_] = pred results_received += len(ids) self.output_queue.task_done() self.stop() return np.array(predictions) # 使用示例 # server = predictionserver(trained_model, max_workers=4) # predictions = server.predict(x_test)
应用场景:构建高性能的模型预测服务,适用于在线或批量预测场景。
8. 并行特征选择
from sklearn.feature_selection import selectkbest, mutual_info_classif from concurrent.futures import threadpoolexecutor import numpy as np def parallel_feature_selection(x, y, k_features=10, workers=4): n_features = x.shape[1] features_per_worker = n_features // workers selected_features = [] def select_features(feature_indices): selector = selectkbest(mutual_info_classif, k=min(k_features, len(feature_indices))) x_subset = x[:, feature_indices] selector.fit(x_subset, y) return [feature_indices[i] for i in selector.get_support(indices=true)] with threadpoolexecutor(max_workers=workers) as executor: futures = [] for i in range(workers): start = i * features_per_worker end = (i+1)*features_per_worker if i != workers-1 else n_features feature_indices = list(range(start, end)) futures.append(executor.submit(select_features, feature_indices)) for future in concurrent.futures.as_completed(futures): selected_features.extend(future.result()) # 二次筛选 if len(selected_features) > k_features: selector = selectkbest(mutual_info_classif, k=k_features) selector.fit(x[:, selected_features], y) selected_features = [selected_features[i] for i in selector.get_support(indices=true)] return selected_features # 使用示例 # selected = parallel_feature_selection(x_train, y_train, k_features=20, workers=4) # x_train_selected = x_train[:, selected] # x_test_selected = x_test[:, selected]
应用场景:高维数据集的并行特征选择。
9. 多线程模型持久化
import concurrent.futures import pickle import gzip from pathlib import path def save_model(model, filepath, compress=true): if compress: with gzip.open(filepath, 'wb') as f: pickle.dump(model, f) else: with open(filepath, 'wb') as f: pickle.dump(model, f) return filepath def parallel_save_models(models_info, workers=4): path("saved_models").mkdir(exist_ok=true) with concurrent.futures.threadpoolexecutor(max_workers=workers) as executor: futures = [] for model_name, model in models_info.items(): filepath = f"saved_models/{model_name}.pkl.gz" futures.append(executor.submit(save_model, model, filepath)) for future in concurrent.futures.as_completed(futures): print(f"模型已保存到: {future.result()}") # 使用示例 # models = { # 'random_forest': rf_model, # 'gradient_boosting': gb_model, # 'svm': svm_model # } # parallel_save_models(models, workers=4)
应用场景:同时保存多个训练好的模型,节省i/o时间。
10. 多线程数据增强
import concurrent.futures import numpy as np from albumentations import compose, horizontalflip, rotate, randombrightnesscontrast def augment_image(image, augmentations): return augmentations(image=image)['image'] def parallel_data_augmentation(images, labels, augmentations, multiplier=4, workers=4): augmented_images = [] augmented_labels = [] # 创建增强管道 aug_pipeline = compose([ horizontalflip(p=0.5), rotate(limit=30, p=0.5), randombrightnesscontrast(p=0.2), ]) # 准备任务参数 tasks = [] for _ in range(multiplier): for img, lbl in zip(images, labels): tasks.append((img, lbl, aug_pipeline)) # 并行执行增强 with concurrent.futures.threadpoolexecutor(max_workers=workers) as executor: futures = [executor.submit(augment_image, *task[:2], task[2]) for task in tasks] for future, task in zip(futures, tasks): augmented_images.append(future.result()) augmented_labels.append(task[1]) # 合并原始数据 augmented_images = np.concatenate([images, augmented_images]) augmented_labels = np.concatenate([labels, augmented_labels]) return augmented_images, augmented_labels # 使用示例 # x_train_aug, y_train_aug = parallel_data_augmentation(x_train, y_train, multiplier=3, workers=4)
应用场景:图像数据的并行增强,特别适用于深度学习中的小数据集。
总结
本文介绍了10个python多线程在机器学习中的高级应用脚本,涵盖了从数据预处理到模型训练、评估和部署的全流程。通过合理利用多线程技术,可以显著提升机器学习工作流的效率,特别是在处理大规模数据或计算密集型任务时。
到此这篇关于10个提升python模型训练效率的高级脚本的文章就介绍到这了,更多相关python模型训练内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论