引言
python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 python 的全局解释器锁(gil),在多核 cpu 上实现真正的并行,特别适合 cpu 密集型任务(如数值计算、图像处理)。相比线程(threading 模块),multiprocessing 更适合需要高性能计算的场景。本文将详细介绍 multiprocessing 模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项。
1. multiprocessing 模块的定义和原理
1.1 定义
multiprocessing 是一个跨平台的模块,提供创建和管理进程的 api,支持进程间通信(ipc)、同步机制和共享资源管理。它模仿了 threading 模块的接口,方便开发者从线程迁移到进程。
核心功能:
- 进程创建:创建独立进程,运行指定函数或任务。
- 进程池:管理一组工作进程,分配任务。
- 进程通信:支持管道(
pipe)、队列(queue)等 ipc 机制。 - 同步原语:提供锁(
lock)、信号量(semaphore)、事件(event)等。 - 共享内存:支持共享基本数据类型(
value)和数组(array)。 - 跨平台:在 windows、linux、macos 上运行一致。
依赖:标准库,无需额外安装。
1.2 原理
- 进程 vs 线程:
- 进程:独立的内存空间,拥有自己的 python 解释器和 gil,适合 cpu 密集型任务。
- 线程:共享内存空间,受 gil 限制,适合 i/o 密集型任务。
- gil 绕过:每个进程有独立的 gil,允许多核并行。
- 进程创建:
- linux/macos:使用
fork(复制父进程),或spawn(新进程)。 - windows:始终使用
spawn,启动新解释器。
- linux/macos:使用
- 通信开销:进程间通信(如
queue)比线程慢,需优化设计。
1.3 导入
import multiprocessing
2. multiprocessing 的核心组件和功能
2.1 进程创建(process)
通过 multiprocessing.process 创建进程,运行指定函数。
构造函数:
process(target=none, args=(), kwargs={}, name=none, daemon=none)
target:目标函数。args/kwargs:函数参数。name:进程名称。daemon:是否为守护进程(随主进程退出)。
主要方法:
start():启动进程。join():等待进程结束。terminate():强制终止进程。is_alive():检查进程是否存活。
示例:
import multiprocessing
def worker(num):
print(f"worker {num} running in process {multiprocessing.current_process().name}")
if __name__ == "__main__":
processes = [multiprocessing.process(target=worker, args=(i,)) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
输出(顺序可能不同):
worker 0 running in process process-1 worker 1 running in process process-2 worker 2 running in process process-3
- 说明:创建 3 个进程,每个运行
worker函数。
2.2 进程池(pool)
pool 用于管理固定数量的进程,适合并行处理大量任务。
构造函数:
pool(processes=none, initializer=none, initargs=())
processes:进程数(默认 cpu 核心数)。initializer:每个进程的初始化函数。initargs:初始化函数参数。
主要方法:
map(func, iterable):并行执行func应用于iterable,返回结果列表。imap(func, iterable):惰性版本,返回迭代器。apply(func, args=(), kwds={}):同步执行单任务。apply_async(func, args=(), kwds={}):异步执行单任务。close():关闭池,禁止新任务。join():等待池内进程完成。
示例:
from multiprocessing import pool
def square(n):
return n * n
if __name__ == "__main__":
with pool(processes=4) as pool:
results = pool.map(square, range(10))
print(results) # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
2.3 进程通信
支持 pipe 和 queue 实现进程间数据交换。
pipe
- 双向或单向管道,适合两个进程通信。
构造函数:
pipe(duplex=true)
- 返回
(conn1, conn2),两个连接对象。 duplex=true:双向;false:单向。
示例:
from multiprocessing import process, pipe
def sender(conn):
conn.send("hello from sender")
conn.close()
def receiver(conn):
print(conn.recv())
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = pipe()
p1 = process(target=sender, args=(child_conn,))
p2 = process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
输出:
hello from sender
queue
- 线程和进程安全的队列,适合多生产者/消费者场景。
构造函数:
queue(maxsize=0)
maxsize:最大容量(0 表示无限制)。
示例:
from multiprocessing import process, queue
def producer(queue):
queue.put("data from producer")
def consumer(queue):
print(queue.get())
if __name__ == "__main__":
queue = queue()
p1 = process(target=producer, args=(queue,))
p2 = process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
2.4 同步机制
提供锁、信号量等原语,确保进程安全访问共享资源。
lock
- 互斥锁,防止多个进程同时访问资源。
- 示例:
from multiprocessing import process, lock
def printer(lock, msg):
with lock:
print(msg)
if __name__ == "__main__":
lock = lock()
processes = [process(target=printer, args=(lock, f"message {i}")) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
semaphore
- 控制有限资源的并发访问。
- 示例:
from multiprocessing import process, semaphore
def worker(sem, name):
with sem:
print(f"{name} acquired resource")
# 模拟工作
if __name__ == "__main__":
sem = semaphore(2) # 允许 2 个进程同时访问
processes = [process(target=worker, args=(sem, f"worker {i}")) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
event
- 进程间信号通知。
- 示例:
from multiprocessing import process, event
import time
def wait_for_event(event):
event.wait()
print("event triggered")
if __name__ == "__main__":
event = event()
p = process(target=wait_for_event, args=(event,))
p.start()
time.sleep(1)
event.set() # 触发事件
p.join()
2.5 共享内存
通过 value 和 array 共享基本数据类型。
- value:单个共享值。
- array:共享数组。
示例:
from multiprocessing import process, value, array
def modify(shared_num, shared_arr):
shared_num.value += 1
for i in range(len(shared_arr)):
shared_arr[i] += 1
if __name__ == "__main__":
num = value("i", 0) # 共享整数
arr = array("i", [1, 2, 3]) # 共享数组
p = process(target=modify, args=(num, arr))
p.start()
p.join()
print(num.value) # 输出: 1
print(list(arr)) # 输出: [2, 3, 4]
3. 应用场景
数值计算:
- 并行处理矩阵运算、蒙特卡洛模拟。
- 示例:计算大数组的平方。
图像处理:
- 并行处理图像滤波、特征提取。
- 示例:批量应用卷积滤波。
机器学习:
- 并行训练模型或处理数据预处理。
- 示例:并行特征提取。
数据处理:
- 并行处理 csv 文件、数据库查询。
- 示例:多进程解析日志文件。
爬虫:
- 并行抓取网页(注意网络限制)。
- 示例:结合
urllib并发下载。
4. 示例:多进程爬虫
结合 urllib 和 queue 实现并行网页抓取。
示例:
import urllib.request
from multiprocessing import process, queue
from urllib.error import urlerror
def fetch_url(queue, url):
try:
with urllib.request.urlopen(url) as response:
content = response.read().decode("utf-8")
queue.put((url, len(content)))
except urlerror as e:
queue.put((url, str(e)))
def main():
urls = ["https://example.com", "https://python.org", "https://invalid-url"]
queue = queue()
processes = [process(target=fetch_url, args=(queue, url)) for url in urls]
for p in processes:
p.start()
for p in processes:
p.join()
while not queue.empty():
url, result = queue.get()
print(f"{url}: {result}")
if __name__ == "__main__":
main()
输出(示例):
https://example.com: 1256 https://python.org: 50000 https://invalid-url: [errno 11001] getaddrinfo failed
5. 最佳实践
使用 if __name__ == "__main__"::
- 防止 windows 和某些 unix 系统重复导入模块。
示例:
if __name__ == "__main__":
p = process(target=worker)
p.start()
选择进程池:
- 对于批量任务,使用
pool简化管理。
示例:
with pool(4) as pool:
results = pool.map(func, data)
优化通信:
- 尽量减少进程间通信,使用共享内存或批量传递数据。
示例:
arr = array("i", [0] * size)
异常处理:
- 在子进程中捕获异常,通过
queue或日志返回。
示例:
def worker(queue):
try:
# 工作代码
except exception as e:
queue.put(str(e))
测试代码:
- 使用
pytest测试多进程行为。
示例:
import pytest
from multiprocessing import process
def test_process():
def worker():
print("test")
p = process(target=worker)
p.start()
p.join()
assert p.exitcode == 0
进程数选择:
- 默认使用 cpu 核心数(
multiprocessing.cpu_count())。
示例:
processes = min(len(tasks), multiprocessing.cpu_count())
6. 注意事项
gil 限制:
multiprocessing绕过 gil,适合 cpu 密集型任务;i/o 密集型任务考虑threading或asyncio。
示例:
# i/o 密集型:使用 asyncio
import asyncio
async def fetch():
pass
windows 兼容性:
- windows 使用
spawn,需确保代码在if __name__ == "__main__":中。
示例:
if __name__ == "__main__":
main()
资源管理:
- 及时关闭进程和池,释放资源。
示例:
with pool() as pool:
pool.map(func, data)
序列化开销:
- 传递大数据到子进程(如通过
queue)可能慢,使用共享内存。
示例:
shared_data = value("d", 0.0)
调试难度:
- 子进程错误可能不易捕获,使用日志或
queue返回错误。
示例:
import logging logging.basicconfig(level=logging.info)
7. 总结
python 的 multiprocessing 模块是实现多进程并行的强大工具,绕过 gil,适合 cpu 密集型任务。其核心特点包括:
- 定义:提供进程创建、通信、同步和共享内存的 api。
- 功能:支持
process、pool、queue、pipe、lock等。 - 应用:数值计算、图像处理、机器学习、数据处理、爬虫。
- 最佳实践:使用
if __name__ == "__main__":、优化通信、测试代码。
以上就是python使用multiprocessing模块实现多进程并行计算的详细内容,更多关于python multiprocessing多进程并行计算的资料请关注代码网其它相关文章!
发表评论