当前位置: 代码网 > it编程>前端脚本>Python > Python使用multiprocessing模块实现多进程并行计算

Python使用multiprocessing模块实现多进程并行计算

2025年07月21日 Python 我要评论
引言python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 python 的全局解释器锁(gil),在多核 cpu 上实现真正的并行

引言

python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 python 的全局解释器锁(gil),在多核 cpu 上实现真正的并行,特别适合 cpu 密集型任务(如数值计算、图像处理)。相比线程(threading 模块),multiprocessing 更适合需要高性能计算的场景。本文将详细介绍 multiprocessing 模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项。

1. multiprocessing 模块的定义和原理

1.1 定义

multiprocessing 是一个跨平台的模块,提供创建和管理进程的 api,支持进程间通信(ipc)、同步机制和共享资源管理。它模仿了 threading 模块的接口,方便开发者从线程迁移到进程。

核心功能

  • 进程创建:创建独立进程,运行指定函数或任务。
  • 进程池:管理一组工作进程,分配任务。
  • 进程通信:支持管道(pipe)、队列(queue)等 ipc 机制。
  • 同步原语:提供锁(lock)、信号量(semaphore)、事件(event)等。
  • 共享内存:支持共享基本数据类型(value)和数组(array)。
  • 跨平台:在 windows、linux、macos 上运行一致。

依赖:标准库,无需额外安装。

1.2 原理

  • 进程 vs 线程
    • 进程:独立的内存空间,拥有自己的 python 解释器和 gil,适合 cpu 密集型任务。
    • 线程:共享内存空间,受 gil 限制,适合 i/o 密集型任务。
  • gil 绕过:每个进程有独立的 gil,允许多核并行。
  • 进程创建
    • linux/macos:使用 fork(复制父进程),或 spawn(新进程)。
    • windows:始终使用 spawn,启动新解释器。
  • 通信开销:进程间通信(如 queue)比线程慢,需优化设计。

1.3 导入

import multiprocessing

2. multiprocessing 的核心组件和功能

2.1 进程创建(process)

通过 multiprocessing.process 创建进程,运行指定函数。

构造函数

process(target=none, args=(), kwargs={}, name=none, daemon=none)
  • target:目标函数。
  • args/kwargs:函数参数。
  • name:进程名称。
  • daemon:是否为守护进程(随主进程退出)。

主要方法

  • start():启动进程。
  • join():等待进程结束。
  • terminate():强制终止进程。
  • is_alive():检查进程是否存活。

示例

import multiprocessing

def worker(num):
    print(f"worker {num} running in process {multiprocessing.current_process().name}")

if __name__ == "__main__":
    processes = [multiprocessing.process(target=worker, args=(i,)) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

输出(顺序可能不同):

worker 0 running in process process-1
worker 1 running in process process-2
worker 2 running in process process-3
  • 说明:创建 3 个进程,每个运行 worker 函数。

2.2 进程池(pool)

pool 用于管理固定数量的进程,适合并行处理大量任务。

构造函数

pool(processes=none, initializer=none, initargs=())
  • processes:进程数(默认 cpu 核心数)。
  • initializer:每个进程的初始化函数。
  • initargs:初始化函数参数。

主要方法

  • map(func, iterable):并行执行 func 应用于 iterable,返回结果列表。
  • imap(func, iterable):惰性版本,返回迭代器。
  • apply(func, args=(), kwds={}):同步执行单任务。
  • apply_async(func, args=(), kwds={}):异步执行单任务。
  • close():关闭池,禁止新任务。
  • join():等待池内进程完成。

示例

from multiprocessing import pool

def square(n):
    return n * n

if __name__ == "__main__":
    with pool(processes=4) as pool:
        results = pool.map(square, range(10))
    print(results)  # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.3 进程通信

支持 pipequeue 实现进程间数据交换。

pipe

  • 双向或单向管道,适合两个进程通信。

构造函数

pipe(duplex=true)
  • 返回 (conn1, conn2),两个连接对象。
  • duplex=true:双向;false:单向。

示例

from multiprocessing import process, pipe

def sender(conn):
    conn.send("hello from sender")
    conn.close()

def receiver(conn):
    print(conn.recv())
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = pipe()
    p1 = process(target=sender, args=(child_conn,))
    p2 = process(target=receiver, args=(parent_conn,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

输出

hello from sender

queue

  • 线程和进程安全的队列,适合多生产者/消费者场景。

构造函数

queue(maxsize=0)
  • maxsize:最大容量(0 表示无限制)。

示例

from multiprocessing import process, queue

def producer(queue):
    queue.put("data from producer")

def consumer(queue):
    print(queue.get())

if __name__ == "__main__":
    queue = queue()
    p1 = process(target=producer, args=(queue,))
    p2 = process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.4 同步机制

提供锁、信号量等原语,确保进程安全访问共享资源。

lock

  • 互斥锁,防止多个进程同时访问资源。
  • 示例
from multiprocessing import process, lock

def printer(lock, msg):
    with lock:
        print(msg)

if __name__ == "__main__":
    lock = lock()
    processes = [process(target=printer, args=(lock, f"message {i}")) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

semaphore

  • 控制有限资源的并发访问。
  • 示例
from multiprocessing import process, semaphore

def worker(sem, name):
    with sem:
        print(f"{name} acquired resource")
        # 模拟工作

if __name__ == "__main__":
    sem = semaphore(2)  # 允许 2 个进程同时访问
    processes = [process(target=worker, args=(sem, f"worker {i}")) for i in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

event

  • 进程间信号通知。
  • 示例
from multiprocessing import process, event
import time

def wait_for_event(event):
    event.wait()
    print("event triggered")

if __name__ == "__main__":
    event = event()
    p = process(target=wait_for_event, args=(event,))
    p.start()
    time.sleep(1)
    event.set()  # 触发事件
    p.join()

2.5 共享内存

通过 valuearray 共享基本数据类型。

  • value:单个共享值。
  • array:共享数组。

示例

from multiprocessing import process, value, array

def modify(shared_num, shared_arr):
    shared_num.value += 1
    for i in range(len(shared_arr)):
        shared_arr[i] += 1

if __name__ == "__main__":
    num = value("i", 0)  # 共享整数
    arr = array("i", [1, 2, 3])  # 共享数组
    p = process(target=modify, args=(num, arr))
    p.start()
    p.join()
    print(num.value)  # 输出: 1
    print(list(arr))  # 输出: [2, 3, 4]

3. 应用场景

数值计算

  • 并行处理矩阵运算、蒙特卡洛模拟。
  • 示例:计算大数组的平方。

图像处理

  • 并行处理图像滤波、特征提取。
  • 示例:批量应用卷积滤波。

机器学习

  • 并行训练模型或处理数据预处理。
  • 示例:并行特征提取。

数据处理

  • 并行处理 csv 文件、数据库查询。
  • 示例:多进程解析日志文件。

爬虫

  • 并行抓取网页(注意网络限制)。
  • 示例:结合 urllib 并发下载。

4. 示例:多进程爬虫

结合 urllibqueue 实现并行网页抓取。

示例

import urllib.request
from multiprocessing import process, queue
from urllib.error import urlerror

def fetch_url(queue, url):
    try:
        with urllib.request.urlopen(url) as response:
            content = response.read().decode("utf-8")
            queue.put((url, len(content)))
    except urlerror as e:
        queue.put((url, str(e)))

def main():
    urls = ["https://example.com", "https://python.org", "https://invalid-url"]
    queue = queue()
    processes = [process(target=fetch_url, args=(queue, url)) for url in urls]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    while not queue.empty():
        url, result = queue.get()
        print(f"{url}: {result}")

if __name__ == "__main__":
    main()

输出(示例):

https://example.com: 1256
https://python.org: 50000
https://invalid-url: [errno 11001] getaddrinfo failed

5. 最佳实践

使用 if __name__ == "__main__":

  • 防止 windows 和某些 unix 系统重复导入模块。

示例

if __name__ == "__main__":
    p = process(target=worker)
    p.start()

选择进程池

  • 对于批量任务,使用 pool 简化管理。

示例

with pool(4) as pool:
    results = pool.map(func, data)

优化通信

  • 尽量减少进程间通信,使用共享内存或批量传递数据。

示例

arr = array("i", [0] * size)

异常处理

  • 在子进程中捕获异常,通过 queue 或日志返回。

示例

def worker(queue):
    try:
        # 工作代码
    except exception as e:
        queue.put(str(e))

测试代码

  • 使用 pytest 测试多进程行为。

示例

import pytest
from multiprocessing import process

def test_process():
    def worker():
        print("test")
    p = process(target=worker)
    p.start()
    p.join()
    assert p.exitcode == 0

进程数选择

  • 默认使用 cpu 核心数(multiprocessing.cpu_count())。

示例

processes = min(len(tasks), multiprocessing.cpu_count())

6. 注意事项

gil 限制

  • multiprocessing 绕过 gil,适合 cpu 密集型任务;i/o 密集型任务考虑 threadingasyncio

示例

# i/o 密集型:使用 asyncio
import asyncio
async def fetch():
    pass

windows 兼容性

  • windows 使用 spawn,需确保代码在 if __name__ == "__main__": 中。

示例

if __name__ == "__main__":
    main()

资源管理

  • 及时关闭进程和池,释放资源。

示例

with pool() as pool:
    pool.map(func, data)

序列化开销

  • 传递大数据到子进程(如通过 queue)可能慢,使用共享内存。

示例

shared_data = value("d", 0.0)

调试难度

  • 子进程错误可能不易捕获,使用日志或 queue 返回错误。

示例

import logging
logging.basicconfig(level=logging.info)

7. 总结

python 的 multiprocessing 模块是实现多进程并行的强大工具,绕过 gil,适合 cpu 密集型任务。其核心特点包括:

  • 定义:提供进程创建、通信、同步和共享内存的 api。
  • 功能:支持 processpoolqueuepipelock 等。
  • 应用:数值计算、图像处理、机器学习、数据处理、爬虫。
  • 最佳实践:使用 if __name__ == "__main__":、优化通信、测试代码。

以上就是python使用multiprocessing模块实现多进程并行计算的详细内容,更多关于python multiprocessing多进程并行计算的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com