简介
在现代计算中,并行处理是提高程序性能的重要手段。python提供了多种并行处理的方式,其中concurrent.futures
模块的processpoolexecutor
是一个非常强大且易于使用的工具。本文将通过一个实际示例,展示如何使用processpoolexecutor
进行并行处理,并详细解释代码的工作原理。
完整代码示例
import time import multiprocessing from concurrent.futures import processpoolexecutor, as_completed from typing import list def process_numbers(chunk: list[int], factor: int) -> str: """ 处理数字的函数,通过将它们乘以因子来模拟处理。 这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和, 并返回结果字符串。 """ result = sum(x * factor for x in chunk) time.sleep(0.1) # 使用睡眠模拟工作 return f"处理的块和: {result}" def main(numbers: list[int] = none, num_chunks: int = 10, factor: int = 2): """ 演示并行处理的主函数。 这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、 将数字分成块,并使用processpoolexecutor进行并行处理。 """ import logging logging.basicconfig(level=logging.info) _log = logging.getlogger(__name__) # 如果没有提供数字,则生成示例列表 if numbers is none: numbers = list(range(1, 101)) # 生成1到100的数字 total_numbers = len(numbers) _log.info(f"开始并行处理 {total_numbers} 个数字") cpu_count = multiprocessing.cpu_count() _log.info(f"检测到 {cpu_count} 个cpu核心") # 确定最佳工作进程数量 optimal_workers = min(cpu_count, num_chunks) _log.info(f"使用 {optimal_workers} 个工作进程") # 计算块大小 chunk_size = max(1, total_numbers // optimal_workers) _log.info(f"每个块包含 {chunk_size} 个数字") # 将数字分成块 chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)] _log.info(f"总共生成了 {len(chunks)} 个块") start_time = time.time() processed_count = 0 # 使用processpoolexecutor进行并行处理 with processpoolexecutor(max_workers=optimal_workers) as executor: _log.info("启动processpoolexecutor") # 提交所有任务 futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks] _log.info(f"提交了 {len(futures)} 个任务") # 等待完成并收集结果 for future in as_completed(futures): try: result = future.result() processed_count += 1 _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}") except exception as e: _log.error(f"处理块时出错: {str(e)}") raise elapsed_time = time.time() - start_time _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。") if __name__ == "__main__": # 使用数字列表的示例 main()
代码解释
1. 导入必要的模块
import time import multiprocessing from concurrent.futures import processpoolexecutor, as_completed from typing import list
这些模块提供了我们需要的并行处理功能和类型提示。
2. 定义处理函数
def process_numbers(chunk: list[int], factor: int) -> str: """ 处理数字的函数,通过将它们乘以因子来模拟处理。 这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和, 并返回结果字符串。 """ result = sum(x * factor for x in chunk) time.sleep(0.1) # 使用睡眠模拟工作 return f"处理的块和: {result}"
这个函数模拟了对数字列表的处理,通过将每个数字乘以一个因子并求和。time.sleep(0.1)
用于模拟实际工作。
3. 主函数
def main(numbers: list[int] = none, num_chunks: int = 10, factor: int = 2): """ 演示并行处理的主函数。 这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、 将数字分成块,并使用processpoolexecutor进行并行处理。 """ import logging logging.basicconfig(level=logging.info) _log = logging.getlogger(__name__)
主函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用processpoolexecutor
进行并行处理。
4. 生成数字列表
# 如果没有提供数字,则生成示例列表 if numbers is none: numbers = list(range(1, 101)) # 生成1到100的数字
如果没有提供数字列表,则生成1到100的数字列表。
5. 确定最佳工作进程数量
cpu_count = multiprocessing.cpu_count() _log.info(f"检测到 {cpu_count} 个cpu核心") # 确定最佳工作进程数量 optimal_workers = min(cpu_count, num_chunks) _log.info(f"使用 {optimal_workers} 个工作进程")
根据cpu核心数和用户指定的块数,确定最佳工作进程数量。
6. 将数字分成块
# 计算块大小 chunk_size = max(1, total_numbers // optimal_workers) _log.info(f"每个块包含 {chunk_size} 个数字") # 将数字分成块 chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)] _log.info(f"总共生成了 {len(chunks)} 个块")
将数字列表分成多个块,每个块的大小根据总数和工作进程数量计算。
7. 并行处理
start_time = time.time() processed_count = 0 # 使用processpoolexecutor进行并行处理 with processpoolexecutor(max_workers=optimal_workers) as executor: _log.info("启动processpoolexecutor") # 提交所有任务 futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks] _log.info(f"提交了 {len(futures)} 个任务") # 等待完成并收集结果 for future in as_completed(futures): try: result = future.result() processed_count += 1 _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}") except exception as e: _log.error(f"处理块时出错: {str(e)}") raise
使用processpoolexecutor
进行并行处理,提交所有任务并等待完成。
8. 计算耗时
elapsed_time = time.time() - start_time _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")
计算并行处理的总耗时并输出。
并行处理的基本概念和优势
并行处理是指同时执行多个任务,以提高程序的执行效率。python的concurrent.futures
模块提供了一个高级接口,用于并行执行任务。processpoolexecutor
是其中一个重要的类,它使用多进程来并行执行任务。
并行处理的优势包括:
- 提高程序的执行效率
- 充分利用多核cpu的计算能力
- 简化多线程或多进程编程的复杂性
如何运行和测试这个示例
- 将上述代码保存为
parallel_processing_example.py
文件。 - 确保你的python环境中安装了必要的模块(本示例不需要额外安装模块)。
- 在终端或命令行中运行以下命令:
python parallel_processing_example.py
你将看到程序的执行过程和并行处理的结果。
总结
通过这个示例,我们展示了如何使用python的processpoolexecutor
进行并行处理。并行处理是提高程序性能的重要手段,特别是在处理大量数据或计算密集型任务时。希望这个示例能帮助你更好地理解并行处理的概念和实现。
到此这篇关于python并行处理实战之如何使用processpoolexecutor加速计算的文章就介绍到这了,更多相关python processpoolexecutor加速计算内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论