引言
python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 python 的全局解释器锁(gil),在多核 cpu 上实现真正的并行,特别适合 cpu 密集型任务(如数值计算、图像处理)。相比线程(threading
模块),multiprocessing
更适合需要高性能计算的场景。本文将详细介绍 multiprocessing
模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项。
1. multiprocessing 模块的定义和原理
1.1 定义
multiprocessing
是一个跨平台的模块,提供创建和管理进程的 api,支持进程间通信(ipc)、同步机制和共享资源管理。它模仿了 threading
模块的接口,方便开发者从线程迁移到进程。
核心功能:
- 进程创建:创建独立进程,运行指定函数或任务。
- 进程池:管理一组工作进程,分配任务。
- 进程通信:支持管道(
pipe
)、队列(queue
)等 ipc 机制。 - 同步原语:提供锁(
lock
)、信号量(semaphore
)、事件(event
)等。 - 共享内存:支持共享基本数据类型(
value
)和数组(array
)。 - 跨平台:在 windows、linux、macos 上运行一致。
依赖:标准库,无需额外安装。
1.2 原理
- 进程 vs 线程:
- 进程:独立的内存空间,拥有自己的 python 解释器和 gil,适合 cpu 密集型任务。
- 线程:共享内存空间,受 gil 限制,适合 i/o 密集型任务。
- gil 绕过:每个进程有独立的 gil,允许多核并行。
- 进程创建:
- linux/macos:使用
fork
(复制父进程),或spawn
(新进程)。 - windows:始终使用
spawn
,启动新解释器。
- linux/macos:使用
- 通信开销:进程间通信(如
queue
)比线程慢,需优化设计。
1.3 导入
import multiprocessing
2. multiprocessing 的核心组件和功能
2.1 进程创建(process)
通过 multiprocessing.process
创建进程,运行指定函数。
构造函数:
process(target=none, args=(), kwargs={}, name=none, daemon=none)
target
:目标函数。args
/kwargs
:函数参数。name
:进程名称。daemon
:是否为守护进程(随主进程退出)。
主要方法:
start()
:启动进程。join()
:等待进程结束。terminate()
:强制终止进程。is_alive()
:检查进程是否存活。
示例:
import multiprocessing def worker(num): print(f"worker {num} running in process {multiprocessing.current_process().name}") if __name__ == "__main__": processes = [multiprocessing.process(target=worker, args=(i,)) for i in range(3)] for p in processes: p.start() for p in processes: p.join()
输出(顺序可能不同):
worker 0 running in process process-1 worker 1 running in process process-2 worker 2 running in process process-3
- 说明:创建 3 个进程,每个运行
worker
函数。
2.2 进程池(pool)
pool
用于管理固定数量的进程,适合并行处理大量任务。
构造函数:
pool(processes=none, initializer=none, initargs=())
processes
:进程数(默认 cpu 核心数)。initializer
:每个进程的初始化函数。initargs
:初始化函数参数。
主要方法:
map(func, iterable)
:并行执行func
应用于iterable
,返回结果列表。imap(func, iterable)
:惰性版本,返回迭代器。apply(func, args=(), kwds={})
:同步执行单任务。apply_async(func, args=(), kwds={})
:异步执行单任务。close()
:关闭池,禁止新任务。join()
:等待池内进程完成。
示例:
from multiprocessing import pool def square(n): return n * n if __name__ == "__main__": with pool(processes=4) as pool: results = pool.map(square, range(10)) print(results) # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
2.3 进程通信
支持 pipe
和 queue
实现进程间数据交换。
pipe
- 双向或单向管道,适合两个进程通信。
构造函数:
pipe(duplex=true)
- 返回
(conn1, conn2)
,两个连接对象。 duplex=true
:双向;false
:单向。
示例:
from multiprocessing import process, pipe def sender(conn): conn.send("hello from sender") conn.close() def receiver(conn): print(conn.recv()) conn.close() if __name__ == "__main__": parent_conn, child_conn = pipe() p1 = process(target=sender, args=(child_conn,)) p2 = process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()
输出:
hello from sender
queue
- 线程和进程安全的队列,适合多生产者/消费者场景。
构造函数:
queue(maxsize=0)
maxsize
:最大容量(0 表示无限制)。
示例:
from multiprocessing import process, queue def producer(queue): queue.put("data from producer") def consumer(queue): print(queue.get()) if __name__ == "__main__": queue = queue() p1 = process(target=producer, args=(queue,)) p2 = process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()
2.4 同步机制
提供锁、信号量等原语,确保进程安全访问共享资源。
lock
- 互斥锁,防止多个进程同时访问资源。
- 示例:
from multiprocessing import process, lock def printer(lock, msg): with lock: print(msg) if __name__ == "__main__": lock = lock() processes = [process(target=printer, args=(lock, f"message {i}")) for i in range(3)] for p in processes: p.start() for p in processes: p.join()
semaphore
- 控制有限资源的并发访问。
- 示例:
from multiprocessing import process, semaphore def worker(sem, name): with sem: print(f"{name} acquired resource") # 模拟工作 if __name__ == "__main__": sem = semaphore(2) # 允许 2 个进程同时访问 processes = [process(target=worker, args=(sem, f"worker {i}")) for i in range(5)] for p in processes: p.start() for p in processes: p.join()
event
- 进程间信号通知。
- 示例:
from multiprocessing import process, event import time def wait_for_event(event): event.wait() print("event triggered") if __name__ == "__main__": event = event() p = process(target=wait_for_event, args=(event,)) p.start() time.sleep(1) event.set() # 触发事件 p.join()
2.5 共享内存
通过 value
和 array
共享基本数据类型。
- value:单个共享值。
- array:共享数组。
示例:
from multiprocessing import process, value, array def modify(shared_num, shared_arr): shared_num.value += 1 for i in range(len(shared_arr)): shared_arr[i] += 1 if __name__ == "__main__": num = value("i", 0) # 共享整数 arr = array("i", [1, 2, 3]) # 共享数组 p = process(target=modify, args=(num, arr)) p.start() p.join() print(num.value) # 输出: 1 print(list(arr)) # 输出: [2, 3, 4]
3. 应用场景
数值计算:
- 并行处理矩阵运算、蒙特卡洛模拟。
- 示例:计算大数组的平方。
图像处理:
- 并行处理图像滤波、特征提取。
- 示例:批量应用卷积滤波。
机器学习:
- 并行训练模型或处理数据预处理。
- 示例:并行特征提取。
数据处理:
- 并行处理 csv 文件、数据库查询。
- 示例:多进程解析日志文件。
爬虫:
- 并行抓取网页(注意网络限制)。
- 示例:结合
urllib
并发下载。
4. 示例:多进程爬虫
结合 urllib
和 queue
实现并行网页抓取。
示例:
import urllib.request from multiprocessing import process, queue from urllib.error import urlerror def fetch_url(queue, url): try: with urllib.request.urlopen(url) as response: content = response.read().decode("utf-8") queue.put((url, len(content))) except urlerror as e: queue.put((url, str(e))) def main(): urls = ["https://example.com", "https://python.org", "https://invalid-url"] queue = queue() processes = [process(target=fetch_url, args=(queue, url)) for url in urls] for p in processes: p.start() for p in processes: p.join() while not queue.empty(): url, result = queue.get() print(f"{url}: {result}") if __name__ == "__main__": main()
输出(示例):
https://example.com: 1256 https://python.org: 50000 https://invalid-url: [errno 11001] getaddrinfo failed
5. 最佳实践
使用 if __name__ == "__main__":
:
- 防止 windows 和某些 unix 系统重复导入模块。
示例:
if __name__ == "__main__": p = process(target=worker) p.start()
选择进程池:
- 对于批量任务,使用
pool
简化管理。
示例:
with pool(4) as pool: results = pool.map(func, data)
优化通信:
- 尽量减少进程间通信,使用共享内存或批量传递数据。
示例:
arr = array("i", [0] * size)
异常处理:
- 在子进程中捕获异常,通过
queue
或日志返回。
示例:
def worker(queue): try: # 工作代码 except exception as e: queue.put(str(e))
测试代码:
- 使用
pytest
测试多进程行为。
示例:
import pytest from multiprocessing import process def test_process(): def worker(): print("test") p = process(target=worker) p.start() p.join() assert p.exitcode == 0
进程数选择:
- 默认使用 cpu 核心数(
multiprocessing.cpu_count()
)。
示例:
processes = min(len(tasks), multiprocessing.cpu_count())
6. 注意事项
gil 限制:
multiprocessing
绕过 gil,适合 cpu 密集型任务;i/o 密集型任务考虑threading
或asyncio
。
示例:
# i/o 密集型:使用 asyncio import asyncio async def fetch(): pass
windows 兼容性:
- windows 使用
spawn
,需确保代码在if __name__ == "__main__":
中。
示例:
if __name__ == "__main__": main()
资源管理:
- 及时关闭进程和池,释放资源。
示例:
with pool() as pool: pool.map(func, data)
序列化开销:
- 传递大数据到子进程(如通过
queue
)可能慢,使用共享内存。
示例:
shared_data = value("d", 0.0)
调试难度:
- 子进程错误可能不易捕获,使用日志或
queue
返回错误。
示例:
import logging logging.basicconfig(level=logging.info)
7. 总结
python 的 multiprocessing
模块是实现多进程并行的强大工具,绕过 gil,适合 cpu 密集型任务。其核心特点包括:
- 定义:提供进程创建、通信、同步和共享内存的 api。
- 功能:支持
process
、pool
、queue
、pipe
、lock
等。 - 应用:数值计算、图像处理、机器学习、数据处理、爬虫。
- 最佳实践:使用
if __name__ == "__main__":
、优化通信、测试代码。
以上就是python使用multiprocessing模块实现多进程并行计算的详细内容,更多关于python multiprocessing多进程并行计算的资料请关注代码网其它相关文章!
发表评论