当前位置: 代码网 > it编程>前端脚本>Python > Python模块multiprocessing & 实现多进程并发方式

Python模块multiprocessing & 实现多进程并发方式

2026年05月07日 Python 我要评论
简介multiprocessing模块是python标准库中提供的一个用于实现多进程编程的模块。它基于进程而不是线程,可以利用多核cpu的优势,提高程序的执行效率,同时也可以实现进程间通信和数据共享。

简介

multiprocessing模块是python标准库中提供的一个用于实现多进程编程的模块。它基于进程而不是线程,可以利用多核cpu的优势,提高程序的执行效率,同时也可以实现进程间通信和数据共享。

1. 参数说明

1.1.process(控制进程)

用于创建子进程对象,必须指定要执行的目标函数。process(target = [函数名])

语法

multiprocessing.process(
    target   #必选参数,表示进程所执行的目标函数。
    group    #预留参数,不需要使用。
    name     #指定子进程的名称,通过 [子进程].name 获取
    args     #指定子进程(函数)需要传递的参数,以元组方式传递,例如:args=('a', 'b', 'c')
    kwargs   #指定子进程(函数)需要传递的参数,以字典方式传递,例如:args={'name':'xiaowang', 'age':18}
    daemon   #将子进程设置为守护进程(true|false)。
)
  • 守护进程:主进程退出后,不论该子进程是否结束,强制退出。
  • 非守护进程:主进程退出后,子进程若未结束,会继续执行。

所有方法如下

p = multiprocessing.process(target = xxx)

    p.run       #表示进程在启动后要执行的方法,需要应用程序开发人员重写。
    p.start     #启动进程。
    p.join      #等待进程终止。
    p.is_alive  #判断进程是否存活。
    p.ident     #获取子进程pid
    p.name      #获取子进程名称
    p.kill      #强制结束子进程。
    p.terminate #强制结束子进程。
    p.close     #关闭与进程关联的所有管道和文件。
    authkey     #返回用于身份验证的进程通信的密钥。

1.2.queue(进程通信)

用于实现进程间通信的队列,支持多个进程向同一个队列中读写数据。

参数选项

multiprocessing.queue([maxsize])
  • maxsize(可选)参数是一个int型整数,用于指定队列中最多可以存放的实例数,超过此值会保持阻塞,直到队列中有位置出现。
  • 如果maxsize是负数,则表示队列的长度为无限。如果maxsize是零,则表示队列的长度为无限。
  • 但由于诸如pickle等协议的缘故,正在发送或接收的大型对象可能会导致程序死锁,因此不建议这样使用。

所有方法如下

'''向消息队列发送信息'''
queue.put(item[, block[, timeout]])
    #如果block参数是true,而且队列已满,那么程序就会在队列有空间之前停滞等待。
    #如果block参数是false,并且队列已满,那么就会引发full异常。
    #如果给出了可选参数timeout,它会阻塞timeout秒。
'''向消息队列发送信息,如果队列已满,会引发full异常,'''
queue.put_nowait(item)
    #等同于queue.put(item, false)。

'''获取队列中元素并删除'''
queue.get([block[, timeout]])
    #如果队列为空,且block参数为true,那么程序会一直停滞等待,或者阻塞timeout秒。
    #如果block参数是false,而且队列为空,那么就会引发empty异常。
'''获取队列中元素并删除,如果队列为空则引发empty异。'''
queue.get_nowait()

'''关闭进程间通信通道,以防止有些进程被阻塞,从而导致程序死锁或者内存泄露'''
queue.close()

'''判断队列是否为空'''
queue.empty()    #为空返回true,不为空返回false。
'''判断队列是否已满'''
queue.full()     #队列已满返回true,未满返回false。

'''获取队列中的元素个数'''
queue.qsize()    #注意:此方法不可靠,因为在 queue.put() 和 queue.get() 方法的过程中仍然可能发生改变。

1.3.pipe(管道通信)

创建进程间通信管道,支持两个进程之间的通信。

语法

multiprocessing.pipe([duplex])
  • duplex:是否创建一个双向通信的管道(默认true),代表创建一个双向管道。如果设置为false,则创建一个单向管道(只能从某一端写入数据,从另一端读出数据)。

所有方法如下

'''向管道中写入消息'''
pipe.send('消息')    #如果发送失败会抛出 brokenpipeerror 异常。
'''从管道中读取数据,会阻塞直到有数据可读'''
pipe.recv():         #如果管道已关闭,则会返回一个 eoferror 异常。

'''关闭管道'''
pipe.close()

'''返回管道的文件描述符'''
pipe.fileno()

'''判断读取管道是否阻塞(如果管道可读或者关闭,返回true)'''
pipe.poll([timeout]) #如果设置了 timeout,则会在指定时间内返回。

'''二进制数据-向管道中写入消息'''
pipe.send_bytes(buf[, offset[, size]]) #与send一样功能,但是接受的参数为二进制字符串。
'''二进制数据-从管道中读取数据'''
pipe.recv_bytes([maxlength])           #与recv类似,但是返回的是二进制字符串。

1.4.pool(进程池)

语法

multiprocessing.pool(
    processes         #可选参数,指定进程池中的线程数(默认cpu最大数)
    initializer       #可选参数,指定每个工作进程启动时要调用的函数。默认值为 none。
    initargs          #可选参数,指定传递给初始化器函数的参数元组。默认值为 ()。
    maxtasksperchild  #可选参数,限制每个工作进程可以执行的任务数量,然后被终止,以避免内存泄漏问题。默认值为 none,表示进程将一直存在。
)

所有方法如下

pool.[方法]

'''进程池中同步执行函数, 类似于 func(*args, **kwds) 表达式'''
pool.apply(func, args=(), kwds={})    #如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''在进程池中异步执行函数'''
apply_async(func, args=(), callback=none)  #支持回调函数。

'''进程池中同步执行函数,将执行结果存储于列表中返回, 类似于 map(func, iterable) 表达式'''
pool.map(func, iterable, chunksize=none)  #返回一个包含结果的列表。如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''进程池中异步执行函数,并返回生成器对象, 可以逐个获取函数执行结果'''
pool.imap(func, iterable, chunksize=1)  #这可以在内存有限的情况下处理大型数据集。如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''类似于 imap() 函数,但是结果不保证按照迭代器的顺序产生'''
pool.imap_unordered(func, iterable, chunksize=1)  #如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。

'''关闭进程池'''
pool.close()

'''等待所有进程完成'''
pool.join()

'''强制关闭所有进程'''
pool.terminate()

1.5.lock(进程锁)

语法

multiprocessing.lock(locking = true)
    locking=true:使用底层操作系统提供的锁机制,例如 posix 信号量或 windows 临界区。这是默认值。
    locking=false:会更快地获取和释放锁,但是在一些平台上可能会出现未定义的行为。

所有方法如下

'''获取锁'''
lock.acquire(block=true, timeout=none)  #如果 block=true,则会阻塞直到获取到锁,否则立即返回。如果 timeout 不是 none,则在超时之前没有获取到锁就会抛出 timeouterror 异常。如果锁已经被当前进程持有,则会抛出 assertionerror 异常。
'''释放锁'''
lock.release()  #如果锁没有被当前进程持有,则会抛出 assertionerror 异常。

lock.locked()            #返回当前锁是否被持有的布尔值。
lock.notify(n=1)         #唤醒 wait() 方法中等待锁的 n 个线程。
lock.notify_all()        #唤醒 wait() 方法中等待锁的所有线程。
lock.wait(timeout=none)  #等待锁。如果锁已经被释放,则会立即返回。如果锁仍然被持有,则会释放锁,并阻塞直到其他线程唤醒该线程或者超时,则会抛出 timeouterror 异常。

2. 进程管理

2.1. 判断子进程状态

通过is_alive 判断某个子进程是否存活

import multiprocessing
from time import sleep

# 定义一个函数(作为子进程)
def proc():
    print('=========== 子进程开始运行 ===========')
    sleep(3)
    print('=========== 子进程运行结束===========')

if __name__ == '__main__':
    # 将函数proc定义为子进程
    p = multiprocessing.process(target=proc)

    # 启动子进程
    p.start()

    # 判断子进程是否结束
    while p.is_alive():
        print('[check] 子进程运行中')
        sleep(1)
    else:
        print('子进程已死亡!')

结果

[check] 子进程运行中
=========== 子进程开始运行 ===========
[check] 子进程运行中
[check] 子进程运行中
[check] 子进程运行中
=========== 子进程运行结束===========
子进程已死亡!

2.2. 获取子进程信息

获取子进程名称和pid

import multiprocessing

def proc1():
    '''模仿一个子进程'''
    pass

def proc2():
    '''模仿一个子进程'''
    pass

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.process(target=proc1, name='【自定义的p1】')
    p2 = multiprocessing.process(target=proc2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 获取子进程的名称和pid
    print(f'进程1的名称为:{p1.name}, pid为:{p1.ident}')
    print(f'进程2的名称为:{p2.name}, pid为:{p2.ident}')
  • 可以通过参数 name 设置子进程名称,未设置默认process-[n]

结果

进程1的名称为:【自定义的p1】, pid为:20136
进程2的名称为:process-2, pid为:15816

2.3. 子进程执行多任务

实现简单的多任务执行

import multiprocessing
import time

def progress1():
    '''模仿一个子进程'''
    for i in range(1,6):
        print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
        time.sleep(1)

def progress2():
    '''模仿一个子进程'''
    for i in range(1,5):
        print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
        time.sleep(2)

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.process(target=progress1)
    p2 = multiprocessing.process(target=progress2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 程序执行其他事情
    time.sleep(1)
    print('=============== 结束 =============== ')

输出结果(主进程并没有去等待子进程结束,直接做其他事)

[1/5] 这是进程1,每隔1s输出一次...
[1/4] 这是进程2,每隔2s输出一次...
=============== 结束 =============== 
[2/5] 这是进程1,每隔1s输出一次...
[3/5] 这是进程1,每隔1s输出一次...
[2/4] 这是进程2,每隔2s输出一次...
[4/5] 这是进程1,每隔1s输出一次...
[3/4] 这是进程2,每隔2s输出一次...
[5/5] 这是进程1,每隔1s输出一次...
[4/4] 这是进程2,每隔2s输出一次...

使用 join 等待某个子进程结束再运行主进程

import multiprocessing
import time

def progress1():
    '''模仿一个子进程'''
    for i in range(1,6):
        print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
        time.sleep(1)

def progress2():
    '''模仿一个子进程'''
    for i in range(1,5):
        print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
        time.sleep(2)

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.process(target=progress1)
    p2 = multiprocessing.process(target=progress2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    # 程序执行其他事情
    print('=============== 结束 =============== ')

结果

[1/5] 这是进程1,每隔1s输出一次...
[1/4] 这是进程2,每隔2s输出一次...
[2/5] 这是进程1,每隔1s输出一次...
[3/5] 这是进程1,每隔1s输出一次...
[2/4] 这是进程2,每隔2s输出一次...
[4/5] 这是进程1,每隔1s输出一次...
[5/5] 这是进程1,每隔1s输出一次...
[3/4] 这是进程2,每隔2s输出一次...
[4/4] 这是进程2,每隔2s输出一次...
=============== 结束 =============== 

将 进程1(p1) 设置为守护进程(随主进程退出而退出)

import multiprocessing
import time

def progress1():
    '''模仿一个子进程'''
    for i in range(1,6):
        print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
        time.sleep(1)

def progress2():
    '''模仿一个子进程'''
    for i in range(1,5):
        print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
        time.sleep(2)

if __name__ == '__main__':
    # 创建2个子进程
    p1 = multiprocessing.process(target=progress1, name='【自定义的p1】', daemon=true)
    p2 = multiprocessing.process(target=progress2)

    # 启动进程1
    p1.start()
    # 启动进程2
    p2.start()

    # 程序执行其他事情
    time.sleep(2)
    print('====================== 结束 ======================')
    print(f'进程1的名称为:{p1.name}, pid为:{p1.ident}')
    print(f'进程2的名称为:{p2.name}, pid为:{p2.ident}')
    print('====================== 结束 ======================')

结果

[1/5] 这是进程1,每隔1s输出一次...
[1/4] 这是进程2,每隔2s输出一次...
[2/5] 这是进程1,每隔1s输出一次...
====================== 结束 ======================
进程1的名称为:【自定义的p1】, pid为:21252
进程2的名称为:process-2, pid为:21253
====================== 结束 ======================
[2/4] 这是进程2,每隔2s输出一次...
[3/4] 这是进程2,每隔2s输出一次...
[4/4] 这是进程2,每隔2s输出一次...

2.4. 运行多个并发

通过循环的方式去构造多个并发

import multiprocessing
from time import sleep

class myclass(object):
    def __init__(self, thread_num=1, sleep_proc=none):
        # thread_num表示进程数,sleep_proc表示是否等待退出
        self.thread_num = thread_num
        self.sleep_proc = sleep_proc

    def proc(self):
        # 定义一个并发的子进程
        for i in range(1,4):
            print(f'[{i}/3] 我是一个子进程')
            sleep(1)

    def call_proc(self):
        # 调用子进程
        processes = []
        # 循环调用多个子进程
        for num in range(self.thread_num):
            # 定义子进程属性
            p = multiprocessing.process(target=myclass().proc)
            # 启动子进程
            p.start()
            # 将循环的子进程放入列表,用于后面的等待退出
            processes.append(p)

        # 判断是否等待子进程结束
        if self.sleep_proc is true:
            for p in processes:
                p.join()

if __name__ == '__main__':
    # 指定2个并发,并等待子进程结束
    mc = myclass(2,true)
    mc.call_proc()

    print('=============== 结束 ===============')

结果

[1/3] 我是一个子进程
[1/3] 我是一个子进程
[2/3] 我是一个子进程
[2/3] 我是一个子进程
[3/3] 我是一个子进程
[3/3] 我是一个子进程
=============== 结束 ===============

2.5. 强制杀死子进程

通过 terminate 或者 kill 强制杀死子进程

import multiprocessing
from time import sleep

# 定义2个函数(作为子进程)
def proc1():
    print('=========== 子进程1开始运行 ===========')
    sleep(10)
    print('=========== 子进程1运行结束===========')

def proc2():
    print('=========== 子进程2开始运行 ===========')
    sleep(10)
    print('=========== 子进程2运行结束===========')

if __name__ == '__main__':
    # 将函数定义为子进程
    p1 = multiprocessing.process(target=proc1)
    p2 = multiprocessing.process(target=proc2)

    # 启动子进程
    p1.start()
    p2.start()

    # 3秒后手动杀死子进程
    sleep(3)
    p1.kill()       #使用kill杀死子进程
    p2.terminate()  #使用terminate杀死子进程
    print('子进程p1、p2已被强制杀死!')

3. 进程间的通信

进程间通信(ipc,interprocess communication)是一组编程接口,让程序员能够协调不同的进程,使之能在一个操作系统里同时运行,并相互传递、交换信息。这使得一个程序能够在同一时间里处理许多用户的要求。因为即使只有一个用户发出要求,也可能导致一个操作系统中多个进程的运行,进程之间必须互相通话。ipc接口就提供了这种可能性。

python 的 multiprocessing 模块中,可以使用 queue、pipe、manager 等数据结构实现进程间通信(ipc),也就是进程之间交换数据。进程间通信是多进程编程中非常重要和常见的一部分,通常用于在多个进程之间共享并传递信息、数据或任务结果。

3.1. queue 进程通信

from multiprocessing import process, queue
from time import sleep


def proc1(q):
    '''这是一个发送消息的函数'''
    msgs = ["香蕉", "苹果", "水蜜桃"]
    for msg in msgs:
        # 将迭代对象放入通信队列
        q.put(msg)
        # 打印当前迭代的内容
        print(f"[进程1] 发送信息({msg})")
        sleep(1)
    # 迭代完成后关闭通信
    q.close()


def proc2(q):
    '''这是一个接收消息的函数'''
    while true:
        try:
            # 删除通信队列中的一个元素
            msg = q.get(block=false)
            print(f"[进程2] 收到信息({msg}), 并删除该信息")
        except:
            q.close()   # 通信完成后关闭
            break
        sleep(1)


if __name__ == '__main__':
    # 使用queue方法通信
    q = queue()
    # 定义子进程属性,将 queue 方法传入子进程
    p1 = process(target=proc1, args=(q,))
    p2 = process(target=proc2, args=(q,))

    # 启动2个子进程
    p1.start()
    p2.start()

    # 等待2个子进程结束
    p1.join()
    p2.join()

    print("=============== 结束 ===============")
  • 进程1发送信息,进程2读取信息并删除队列(生产者-消费者)

逻辑视图

3.2.queue控制多个子进程

  • 设置多个子进程,一个用于控制,其他执行对应程序
  • 控制的方法:运行子进程通过消息队列判断是否继续运行。

若消息队列为空,继续运行子进程;若消息队列不为空,停止子进程。由控制子进程决定

from multiprocessing import process,queue
from time import sleep


class myclass(object):
    def __init__(self, time):
        self.time = time    #指定n秒后退出子进程
        self.q = queue()    #将queue赋值给公共方法

    def sed_msgs(self):
        '''该方法决定子进程是否退出'''
        sleep(self.time)    #按指定时间休眠
        self.q.put('over')  #休眠后向消息队列发送一条消息
        self.q.close()      #关闭通信

    def proc1(self):
        '''这是一个子进程,当消息队列不为空则停止运行'''
        while self.q.empty():
            print('[进程1] 执行当前任务...')
            sleep(1)
        self.q.close()

    def proc2(self):
        '''这是一个子进程,当消息队列不为空则停止运行'''
        while self.q.empty():
            print('[进程2] 执行当前任务...')
            sleep(1)
        self.q.close()

if __name__ == '__main__':
    mc = myclass(3)     #给定休眠时间3s

    # 定义子进程
    s1 = process(target=mc.sed_msgs)
    p1 = process(target=mc.proc1)
    p2 = process(target=mc.proc2)
    communication = [s1, p1, p2]

    # 启动所有子进程
    for s in communication:
        s.start()

    # 等待所有子进程结束
    for s in communication:
        s.join()

    print('====================== 结束 ======================')

逻辑视图

3.3. pipe 管道通信

pipe()支持多个进程间的管道通信。管道可以被多个进程访问,但是一次只能有一个进程对管道进行操作。

pipe()赋值给两个对象:p1,p2 =pipe(),p1是发送消息的方法,p2是接收消息的方法。

定义2个进程,分别向管道中发送消息和接收消息

from multiprocessing import pipe, process

class myclass(object):
    '''定义2个管道和2个进程,相互发送和接收消息'''
    def __init__(self):
        # parent_conn表示发送消息,child_conn表示接收消息
        self.parent_conn1, self.child_conn1 = pipe()
        self.parent_conn2, self.child_conn2 = pipe()

    def proc1(self):
        # 向管道1中发送消息
        self.parent_conn1.send('苹果')
        # 接收管道2中的消息
        received_message = self.child_conn2.recv()
        print(f'[进程1] 收到的消息是:{received_message}')
        # 关闭管道1
        self.parent_conn1.close()

    def proc2(self):
        # 接收管道1中的消息
        received_message = self.child_conn1.recv()
        print(f'[进程2] 收到的消息是:{received_message}')
        # 向管道2中发送消息
        self.parent_conn2.send('香蕉')
        # 关闭管道2
        self.parent_conn2.close()

if __name__ == '__main__':
    mc = myclass()

    # 创建子进程
    p1 = process(target=mc.proc1)
    p2 = process(target=mc.proc2)

    # 启动子进程
    p1.start()
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    print('=================== 结束 ===================')

说明:

进程1向管道1中发送一条消息(苹果),发送完成之后接收管道2消息,如果没有则等待。

进程2接收管道1的消息并输出,再向管道2中发送一条消息(香蕉)。

4. 进程池

4.1. 同步与异步并发区别

  • 同步(阻塞):同步并发类似于串行。例如a、b两个进程,必须a执行完成后才能执行b;若a出现意外(一直阻塞),那么b将无法执行。
  • 异步(非阻塞):异步并发在执行多任务时不会出现相互阻塞的情况(进程池限制除外)。如有a、b两个进程,他们俩可以同时进行,不会出现相互等待的情况。

同步的简单代码如下

from multiprocessing import pool
from time import sleep

def proc1():
    '''子进程1'''
    for i in range(2):
        print(f'[进程1] 执行{i}...')
        sleep(1)

def proc2():
    '''子进程2'''
    for i in range(2):
        print(f'[进程2] 执行{i}...')
        sleep(1)

if __name__ == '__main__':
    # 定义进程池
    pool = pool()
    # 同步执行2个子进程
    pool.apply(proc1)
    pool.apply(proc2)
    # 关闭和等待子进程结束
    pool.close()
    pool.join()

使用pool.apply ([函数名]) 定义同步执行:先执行 proc1,再执行 proc2

使用异步 pool.apply_async ([函数名])(将上述代码 apply 修改为 apply_async 即可)

    # 异步执行2个子进程
    pool.apply_async (proc1)
    pool.apply_async (proc2)

其结果为:proc1 和 proc2 同时执行

4.2. 异步调用多个子进程

进程池异步并发的基本使用方法

'''定义子进程'''
def start_func():
    pass
def proc1():
    pass
def proc2():
    pass

if __name__ == '__main__':
    # 定义进程池,设置属性
    pool = pool(processes=1, maxtasksperchild=2, initializer=start_func)
    # 异步启动子进程(子进程为指定的某个函数)
    pool.apply_async(proc1)
    pool.apply_async(proc2)
    # 关闭和等待子进程结束
    pool.close()
    pool.join()
  • processes:并发数(默认为系统最大cpu数)。如果将并发数设置为1,即使进程池中有2个进程,也会按同步的方式执行。一般设置不超过cpu数,最好等于子进程数。
  • maxtasksperchild:可以限制在处理完一定数量的任务后重建进程池中的工作进程,以避免内存泄漏和资源泄漏等问题。使用maxtasksperchild参数时需要权衡性能和资源利用率之间的平衡。如果你将maxtasksperchild设置得太小,进程重建的成本可能会显著影响性能。如果你将maxtasksperchild设置得太大,那么可能会导致内存泄漏或资源泄漏。需要根据具体情况进行调整。
  • initializer:指定每个工作进程启动时要调用的函数(func)。如果子进程只有2个,而processes 设置为3,那么将会调用3次 func。

进程池中有4个子进程,仅使用2个并发(a、b、c、d,先执行ab,再执行cd)

from multiprocessing import pool
from time import sleep

def start_func():
    print('启动前此函数')

def proc1():
    '''子进程'''
    for i in range(2):
        print(f'[进程1] 执行{i}...')
        sleep(1)

def proc2():
    '''子进程'''
    for i in range(2):
        print(f'[进程2] 执行{i}...')
        sleep(1)

def proc3():
    '''子进程'''
    for i in range(2):
        print(f'[进程3] 执行{i}...')
        sleep(1)

def proc4():
    '''子进程'''
    for i in range(2):
        print(f'[进程4] 执行{i}...')
        sleep(1)

if __name__ == '__main__':
    # 定义进程池,设置属性
    pool = pool(processes=2, initializer=start_func)
    func_list = [proc1, proc2, proc3, proc4]
    for i in func_list:
        pool.apply_async(i)
    # 关闭和等待子进程结束
    pool.close()
    pool.join()

结果如下:

如果将processes 设置为 4,那么4个进程将会同时进行

4.3. 进程池的高并发

  • 使用 map 或 imap 迭代来高效完成工作

使用 map(同步) 执行高并发。会自动等待子进程完成后,才会进行执行主进程。

from multiprocessing import pool

def proc(num):
    print(f'我是子进程')

if __name__ == '__main__':
    # 定义线程池,设置并发数为5
    pool = pool(5)
    # 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
    pool.map(proc, range(5))
    # 关闭进程池
    pool.close()
    print('========== 结束 ==========')

使用 imap(异步) 执行高并发。调度子进程运行后不会等待,继续执行主进程任务。若主进程运行结束,则子进程不论是否运行完成都将强制结束。

from multiprocessing import pool

def proc(num):
    print(f'我是子进程')

if __name__ == '__main__':
    # 定义线程池,设置并发数为5
    pool = pool(5)
    # 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
    pool.imap(proc, range(5))
    # 关闭进程池
    pool.close()
    print('========== 结束 ==========')

这样的好处是不会影响主进程其他任务的执行,如果需要等待则使用 join

from multiprocessing import pool

def proc(num):
    print(f'我是子进程')

if __name__ == '__main__':
    # 定义线程池,设置并发数为5
    pool = pool(5)
    # 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
    pool.imap(proc, range(5))
    # 关闭进程池
    pool.close()
    # 等待子进程结束
    pool.join()
    print('========== 结束 ==========')

使用高并发需要注意以下几点:

  • 注意控制进程池数量,避免过多的进程导致性能下降。可以通过调用multiprocessing.cpu_count()获取当前系统的 cpu 数量,并使用该值来指定进程池数量。
  • 使用maxtasksperchild参数来限制每个工作进程可以处理的任务数量。这有助于避免长时间运行的进程引起的资源泄漏或内存泄漏。
  • 在提交任务之前,尽可能地减小要处理数据的大小。例如,可以使用迭代器来代替列表,或者使用生成器来延迟计算。
  • 对于长时间运行的任务,可以使用消息队列来缓解性能问题。例如,可以使用 rabbitmq 或 zeromq 等消息队列来异步处理任务。

5. 进程同步

进程同步是指多个进程在共享资源时的协调与同步。单个进程运行时 ,程序的执行是顺序的;多个进程并发执行时可能会造成资源竞争、死锁等问题。因此,进程同步是保证每个进程在使用共享资源时的顺序和正确性。

5.1. 进程加锁的方式

两个进程间实现同步有2种方式,1、手动加锁(释放锁);2、with自动加锁(释放锁)

手动加锁方式

from multiprocessing import lock

def func():
    # 手动加锁
    lock().acquire()

    '''执行程序'''
    pass

    # 手动释放锁
    lock().release()

with 自动加锁(释放锁)

from multiprocessing import lock

def func():
    # with自动加锁(释放锁)
    with lock():
        '''执行程序'''
        pass

5.2. 实现进程同步

  • 多进程之间同步全局变量的修改,则需要使用共享内存。需要用到 multiprocessing 模块中的 value 共享变量类型来实现。

实现逻辑

代码如下

from multiprocessing import process, lock, value
from time import sleep

def func1(lock, shared_var):
    for i in range(3):
        with lock:
            shared_var.value -= 1
            print(f'[func1] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

def func2(lock, shared_var):
    for i in range(3):
        with lock:
            shared_var.value += 2
            print(f'[func2] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

if __name__ == '__main__':
    lock = lock()
    shared_var = value('i', 10)

    # 定义子进程
    p1 = process(target=func1, args=(lock, shared_var))
    p2 = process(target=func2, args=(lock, shared_var))

    # 启动子进程
    p1.start()
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    print(f'最终共享变量var为:{shared_var.value}')

错误示例(使用了多个 lock() )

from multiprocessing import process, lock, value
from time import sleep

def func1(shared_var):
    '''定义子进程,让共享变量-1,执行3次'''
    for i in range(3):
        with lock():    #错误语句
            shared_var.value -= 1
            print(f'[func1] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

def func2(shared_var):
    '''定义子进程,让共享变量+2,执行3次'''
    for i in range(3):
        with lock():    #错误语句
            shared_var.value += 2
            print(f'[func2] 共享变量var当前值为:{shared_var.value}')
            sleep(1)

if __name__ == '__main__':
    shared_var = value('i', 10)

    # 定义子进程
    p1 = process(target=func1, args=(shared_var,))
    p2 = process(target=func2, args=(shared_var,))

    # 启动子进程
    p1.start()
    p2.start()

    # 等待子进程结束
    p1.join()
    p2.join()

    print(f'最终共享变量var为:{shared_var.value}')

按同步逻辑执行的过程应该是:共享变量 = 10(初始) - 1 + 2 - 1 + 2 - 1 + 2 = 13,但实际的值却是11。下图所示,其中一个步骤出错,并没有加锁。

仔细翻看代码,发现在每个函数中都使用了 lock() 方法,也就是说2个函数使用的是2个锁,自然不会加锁,出现了脏读现象。

  • lock():用于实现在进程间同步访问共享资源,保证同一时间只有一个进程能够访问。
  • rlock():与lock类似,但支持递归锁,同一个进程中多次获取仍然有效。
  • semaphore():用于控制进程间的并发数量。
  • event():用于实现进程间的事件通知。
  • condition():用于实现复杂的进程间同步,比如等待某个条件变为真时再继续执行。
  • barrier():用于实现多个进程间的协调操作,比如等待所有进程都到达某个状态再继续执行。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com