简介
multiprocessing模块是python标准库中提供的一个用于实现多进程编程的模块。它基于进程而不是线程,可以利用多核cpu的优势,提高程序的执行效率,同时也可以实现进程间通信和数据共享。
1. 参数说明
1.1.process(控制进程)
用于创建子进程对象,必须指定要执行的目标函数。process(target = [函数名])
语法
multiprocessing.process(
target #必选参数,表示进程所执行的目标函数。
group #预留参数,不需要使用。
name #指定子进程的名称,通过 [子进程].name 获取
args #指定子进程(函数)需要传递的参数,以元组方式传递,例如:args=('a', 'b', 'c')
kwargs #指定子进程(函数)需要传递的参数,以字典方式传递,例如:args={'name':'xiaowang', 'age':18}
daemon #将子进程设置为守护进程(true|false)。
)- 守护进程:主进程退出后,不论该子进程是否结束,强制退出。
- 非守护进程:主进程退出后,子进程若未结束,会继续执行。
所有方法如下
p = multiprocessing.process(target = xxx)
p.run #表示进程在启动后要执行的方法,需要应用程序开发人员重写。
p.start #启动进程。
p.join #等待进程终止。
p.is_alive #判断进程是否存活。
p.ident #获取子进程pid
p.name #获取子进程名称
p.kill #强制结束子进程。
p.terminate #强制结束子进程。
p.close #关闭与进程关联的所有管道和文件。
authkey #返回用于身份验证的进程通信的密钥。1.2.queue(进程通信)
用于实现进程间通信的队列,支持多个进程向同一个队列中读写数据。
参数选项
multiprocessing.queue([maxsize])
- maxsize(可选)参数是一个int型整数,用于指定队列中最多可以存放的实例数,超过此值会保持阻塞,直到队列中有位置出现。
- 如果maxsize是负数,则表示队列的长度为无限。如果maxsize是零,则表示队列的长度为无限。
- 但由于诸如pickle等协议的缘故,正在发送或接收的大型对象可能会导致程序死锁,因此不建议这样使用。
所有方法如下
'''向消息队列发送信息'''
queue.put(item[, block[, timeout]])
#如果block参数是true,而且队列已满,那么程序就会在队列有空间之前停滞等待。
#如果block参数是false,并且队列已满,那么就会引发full异常。
#如果给出了可选参数timeout,它会阻塞timeout秒。
'''向消息队列发送信息,如果队列已满,会引发full异常,'''
queue.put_nowait(item)
#等同于queue.put(item, false)。
'''获取队列中元素并删除'''
queue.get([block[, timeout]])
#如果队列为空,且block参数为true,那么程序会一直停滞等待,或者阻塞timeout秒。
#如果block参数是false,而且队列为空,那么就会引发empty异常。
'''获取队列中元素并删除,如果队列为空则引发empty异。'''
queue.get_nowait()
'''关闭进程间通信通道,以防止有些进程被阻塞,从而导致程序死锁或者内存泄露'''
queue.close()
'''判断队列是否为空'''
queue.empty() #为空返回true,不为空返回false。
'''判断队列是否已满'''
queue.full() #队列已满返回true,未满返回false。
'''获取队列中的元素个数'''
queue.qsize() #注意:此方法不可靠,因为在 queue.put() 和 queue.get() 方法的过程中仍然可能发生改变。1.3.pipe(管道通信)
创建进程间通信管道,支持两个进程之间的通信。
语法
multiprocessing.pipe([duplex])
duplex:是否创建一个双向通信的管道(默认true),代表创建一个双向管道。如果设置为false,则创建一个单向管道(只能从某一端写入数据,从另一端读出数据)。
所有方法如下
'''向管道中写入消息'''
pipe.send('消息') #如果发送失败会抛出 brokenpipeerror 异常。
'''从管道中读取数据,会阻塞直到有数据可读'''
pipe.recv(): #如果管道已关闭,则会返回一个 eoferror 异常。
'''关闭管道'''
pipe.close()
'''返回管道的文件描述符'''
pipe.fileno()
'''判断读取管道是否阻塞(如果管道可读或者关闭,返回true)'''
pipe.poll([timeout]) #如果设置了 timeout,则会在指定时间内返回。
'''二进制数据-向管道中写入消息'''
pipe.send_bytes(buf[, offset[, size]]) #与send一样功能,但是接受的参数为二进制字符串。
'''二进制数据-从管道中读取数据'''
pipe.recv_bytes([maxlength]) #与recv类似,但是返回的是二进制字符串。1.4.pool(进程池)
语法
multiprocessing.pool(
processes #可选参数,指定进程池中的线程数(默认cpu最大数)
initializer #可选参数,指定每个工作进程启动时要调用的函数。默认值为 none。
initargs #可选参数,指定传递给初始化器函数的参数元组。默认值为 ()。
maxtasksperchild #可选参数,限制每个工作进程可以执行的任务数量,然后被终止,以避免内存泄漏问题。默认值为 none,表示进程将一直存在。
)所有方法如下
pool.[方法]
'''进程池中同步执行函数, 类似于 func(*args, **kwds) 表达式'''
pool.apply(func, args=(), kwds={}) #如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。
'''在进程池中异步执行函数'''
apply_async(func, args=(), callback=none) #支持回调函数。
'''进程池中同步执行函数,将执行结果存储于列表中返回, 类似于 map(func, iterable) 表达式'''
pool.map(func, iterable, chunksize=none) #返回一个包含结果的列表。如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。
'''进程池中异步执行函数,并返回生成器对象, 可以逐个获取函数执行结果'''
pool.imap(func, iterable, chunksize=1) #这可以在内存有限的情况下处理大型数据集。如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。
'''类似于 imap() 函数,但是结果不保证按照迭代器的顺序产生'''
pool.imap_unordered(func, iterable, chunksize=1) #如果进程池中的一个进程崩溃,那么就将函数重新提交到池中。
'''关闭进程池'''
pool.close()
'''等待所有进程完成'''
pool.join()
'''强制关闭所有进程'''
pool.terminate()
1.5.lock(进程锁)
语法
multiprocessing.lock(locking = true)
locking=true:使用底层操作系统提供的锁机制,例如 posix 信号量或 windows 临界区。这是默认值。
locking=false:会更快地获取和释放锁,但是在一些平台上可能会出现未定义的行为。所有方法如下
'''获取锁''' lock.acquire(block=true, timeout=none) #如果 block=true,则会阻塞直到获取到锁,否则立即返回。如果 timeout 不是 none,则在超时之前没有获取到锁就会抛出 timeouterror 异常。如果锁已经被当前进程持有,则会抛出 assertionerror 异常。 '''释放锁''' lock.release() #如果锁没有被当前进程持有,则会抛出 assertionerror 异常。 lock.locked() #返回当前锁是否被持有的布尔值。 lock.notify(n=1) #唤醒 wait() 方法中等待锁的 n 个线程。 lock.notify_all() #唤醒 wait() 方法中等待锁的所有线程。 lock.wait(timeout=none) #等待锁。如果锁已经被释放,则会立即返回。如果锁仍然被持有,则会释放锁,并阻塞直到其他线程唤醒该线程或者超时,则会抛出 timeouterror 异常。
2. 进程管理
2.1. 判断子进程状态
通过is_alive 判断某个子进程是否存活
import multiprocessing
from time import sleep
# 定义一个函数(作为子进程)
def proc():
print('=========== 子进程开始运行 ===========')
sleep(3)
print('=========== 子进程运行结束===========')
if __name__ == '__main__':
# 将函数proc定义为子进程
p = multiprocessing.process(target=proc)
# 启动子进程
p.start()
# 判断子进程是否结束
while p.is_alive():
print('[check] 子进程运行中')
sleep(1)
else:
print('子进程已死亡!')结果
[check] 子进程运行中 =========== 子进程开始运行 =========== [check] 子进程运行中 [check] 子进程运行中 [check] 子进程运行中 =========== 子进程运行结束=========== 子进程已死亡!
2.2. 获取子进程信息
获取子进程名称和pid
import multiprocessing
def proc1():
'''模仿一个子进程'''
pass
def proc2():
'''模仿一个子进程'''
pass
if __name__ == '__main__':
# 创建2个子进程
p1 = multiprocessing.process(target=proc1, name='【自定义的p1】')
p2 = multiprocessing.process(target=proc2)
# 启动进程1
p1.start()
# 启动进程2
p2.start()
# 获取子进程的名称和pid
print(f'进程1的名称为:{p1.name}, pid为:{p1.ident}')
print(f'进程2的名称为:{p2.name}, pid为:{p2.ident}')- 可以通过参数 name 设置子进程名称,未设置默认process-[n]
结果
进程1的名称为:【自定义的p1】, pid为:20136 进程2的名称为:process-2, pid为:15816
2.3. 子进程执行多任务
实现简单的多任务执行
import multiprocessing
import time
def progress1():
'''模仿一个子进程'''
for i in range(1,6):
print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
time.sleep(1)
def progress2():
'''模仿一个子进程'''
for i in range(1,5):
print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
time.sleep(2)
if __name__ == '__main__':
# 创建2个子进程
p1 = multiprocessing.process(target=progress1)
p2 = multiprocessing.process(target=progress2)
# 启动进程1
p1.start()
# 启动进程2
p2.start()
# 程序执行其他事情
time.sleep(1)
print('=============== 结束 =============== ')输出结果(主进程并没有去等待子进程结束,直接做其他事)
[1/5] 这是进程1,每隔1s输出一次... [1/4] 这是进程2,每隔2s输出一次... =============== 结束 =============== [2/5] 这是进程1,每隔1s输出一次... [3/5] 这是进程1,每隔1s输出一次... [2/4] 这是进程2,每隔2s输出一次... [4/5] 这是进程1,每隔1s输出一次... [3/4] 这是进程2,每隔2s输出一次... [5/5] 这是进程1,每隔1s输出一次... [4/4] 这是进程2,每隔2s输出一次...
使用 join 等待某个子进程结束再运行主进程
import multiprocessing
import time
def progress1():
'''模仿一个子进程'''
for i in range(1,6):
print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
time.sleep(1)
def progress2():
'''模仿一个子进程'''
for i in range(1,5):
print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
time.sleep(2)
if __name__ == '__main__':
# 创建2个子进程
p1 = multiprocessing.process(target=progress1)
p2 = multiprocessing.process(target=progress2)
# 启动进程1
p1.start()
# 启动进程2
p2.start()
# 等待子进程结束
p1.join()
p2.join()
# 程序执行其他事情
print('=============== 结束 =============== ')结果
[1/5] 这是进程1,每隔1s输出一次... [1/4] 这是进程2,每隔2s输出一次... [2/5] 这是进程1,每隔1s输出一次... [3/5] 这是进程1,每隔1s输出一次... [2/4] 这是进程2,每隔2s输出一次... [4/5] 这是进程1,每隔1s输出一次... [5/5] 这是进程1,每隔1s输出一次... [3/4] 这是进程2,每隔2s输出一次... [4/4] 这是进程2,每隔2s输出一次... =============== 结束 ===============
将 进程1(p1) 设置为守护进程(随主进程退出而退出)
import multiprocessing
import time
def progress1():
'''模仿一个子进程'''
for i in range(1,6):
print(f'[{i}/5] 这是进程1,每隔1s输出一次...')
time.sleep(1)
def progress2():
'''模仿一个子进程'''
for i in range(1,5):
print(f'[{i}/4] 这是进程2,每隔2s输出一次...')
time.sleep(2)
if __name__ == '__main__':
# 创建2个子进程
p1 = multiprocessing.process(target=progress1, name='【自定义的p1】', daemon=true)
p2 = multiprocessing.process(target=progress2)
# 启动进程1
p1.start()
# 启动进程2
p2.start()
# 程序执行其他事情
time.sleep(2)
print('====================== 结束 ======================')
print(f'进程1的名称为:{p1.name}, pid为:{p1.ident}')
print(f'进程2的名称为:{p2.name}, pid为:{p2.ident}')
print('====================== 结束 ======================')结果
[1/5] 这是进程1,每隔1s输出一次... [1/4] 这是进程2,每隔2s输出一次... [2/5] 这是进程1,每隔1s输出一次... ====================== 结束 ====================== 进程1的名称为:【自定义的p1】, pid为:21252 进程2的名称为:process-2, pid为:21253 ====================== 结束 ====================== [2/4] 这是进程2,每隔2s输出一次... [3/4] 这是进程2,每隔2s输出一次... [4/4] 这是进程2,每隔2s输出一次...
2.4. 运行多个并发
通过循环的方式去构造多个并发
import multiprocessing
from time import sleep
class myclass(object):
def __init__(self, thread_num=1, sleep_proc=none):
# thread_num表示进程数,sleep_proc表示是否等待退出
self.thread_num = thread_num
self.sleep_proc = sleep_proc
def proc(self):
# 定义一个并发的子进程
for i in range(1,4):
print(f'[{i}/3] 我是一个子进程')
sleep(1)
def call_proc(self):
# 调用子进程
processes = []
# 循环调用多个子进程
for num in range(self.thread_num):
# 定义子进程属性
p = multiprocessing.process(target=myclass().proc)
# 启动子进程
p.start()
# 将循环的子进程放入列表,用于后面的等待退出
processes.append(p)
# 判断是否等待子进程结束
if self.sleep_proc is true:
for p in processes:
p.join()
if __name__ == '__main__':
# 指定2个并发,并等待子进程结束
mc = myclass(2,true)
mc.call_proc()
print('=============== 结束 ===============')结果
[1/3] 我是一个子进程 [1/3] 我是一个子进程 [2/3] 我是一个子进程 [2/3] 我是一个子进程 [3/3] 我是一个子进程 [3/3] 我是一个子进程 =============== 结束 ===============
2.5. 强制杀死子进程
通过 terminate 或者 kill 强制杀死子进程
import multiprocessing
from time import sleep
# 定义2个函数(作为子进程)
def proc1():
print('=========== 子进程1开始运行 ===========')
sleep(10)
print('=========== 子进程1运行结束===========')
def proc2():
print('=========== 子进程2开始运行 ===========')
sleep(10)
print('=========== 子进程2运行结束===========')
if __name__ == '__main__':
# 将函数定义为子进程
p1 = multiprocessing.process(target=proc1)
p2 = multiprocessing.process(target=proc2)
# 启动子进程
p1.start()
p2.start()
# 3秒后手动杀死子进程
sleep(3)
p1.kill() #使用kill杀死子进程
p2.terminate() #使用terminate杀死子进程
print('子进程p1、p2已被强制杀死!')
3. 进程间的通信
进程间通信(ipc,interprocess communication)是一组编程接口,让程序员能够协调不同的进程,使之能在一个操作系统里同时运行,并相互传递、交换信息。这使得一个程序能够在同一时间里处理许多用户的要求。因为即使只有一个用户发出要求,也可能导致一个操作系统中多个进程的运行,进程之间必须互相通话。ipc接口就提供了这种可能性。
python 的 multiprocessing 模块中,可以使用 queue、pipe、manager 等数据结构实现进程间通信(ipc),也就是进程之间交换数据。进程间通信是多进程编程中非常重要和常见的一部分,通常用于在多个进程之间共享并传递信息、数据或任务结果。
3.1. queue 进程通信
from multiprocessing import process, queue
from time import sleep
def proc1(q):
'''这是一个发送消息的函数'''
msgs = ["香蕉", "苹果", "水蜜桃"]
for msg in msgs:
# 将迭代对象放入通信队列
q.put(msg)
# 打印当前迭代的内容
print(f"[进程1] 发送信息({msg})")
sleep(1)
# 迭代完成后关闭通信
q.close()
def proc2(q):
'''这是一个接收消息的函数'''
while true:
try:
# 删除通信队列中的一个元素
msg = q.get(block=false)
print(f"[进程2] 收到信息({msg}), 并删除该信息")
except:
q.close() # 通信完成后关闭
break
sleep(1)
if __name__ == '__main__':
# 使用queue方法通信
q = queue()
# 定义子进程属性,将 queue 方法传入子进程
p1 = process(target=proc1, args=(q,))
p2 = process(target=proc2, args=(q,))
# 启动2个子进程
p1.start()
p2.start()
# 等待2个子进程结束
p1.join()
p2.join()
print("=============== 结束 ===============")- 进程1发送信息,进程2读取信息并删除队列(生产者-消费者)

逻辑视图

3.2.queue控制多个子进程
- 设置多个子进程,一个用于控制,其他执行对应程序
- 控制的方法:运行子进程通过消息队列判断是否继续运行。
若消息队列为空,继续运行子进程;若消息队列不为空,停止子进程。由控制子进程决定
from multiprocessing import process,queue
from time import sleep
class myclass(object):
def __init__(self, time):
self.time = time #指定n秒后退出子进程
self.q = queue() #将queue赋值给公共方法
def sed_msgs(self):
'''该方法决定子进程是否退出'''
sleep(self.time) #按指定时间休眠
self.q.put('over') #休眠后向消息队列发送一条消息
self.q.close() #关闭通信
def proc1(self):
'''这是一个子进程,当消息队列不为空则停止运行'''
while self.q.empty():
print('[进程1] 执行当前任务...')
sleep(1)
self.q.close()
def proc2(self):
'''这是一个子进程,当消息队列不为空则停止运行'''
while self.q.empty():
print('[进程2] 执行当前任务...')
sleep(1)
self.q.close()
if __name__ == '__main__':
mc = myclass(3) #给定休眠时间3s
# 定义子进程
s1 = process(target=mc.sed_msgs)
p1 = process(target=mc.proc1)
p2 = process(target=mc.proc2)
communication = [s1, p1, p2]
# 启动所有子进程
for s in communication:
s.start()
# 等待所有子进程结束
for s in communication:
s.join()
print('====================== 结束 ======================')
逻辑视图

3.3. pipe 管道通信
pipe()支持多个进程间的管道通信。管道可以被多个进程访问,但是一次只能有一个进程对管道进行操作。
pipe()赋值给两个对象:p1,p2 =pipe(),p1是发送消息的方法,p2是接收消息的方法。
定义2个进程,分别向管道中发送消息和接收消息
from multiprocessing import pipe, process
class myclass(object):
'''定义2个管道和2个进程,相互发送和接收消息'''
def __init__(self):
# parent_conn表示发送消息,child_conn表示接收消息
self.parent_conn1, self.child_conn1 = pipe()
self.parent_conn2, self.child_conn2 = pipe()
def proc1(self):
# 向管道1中发送消息
self.parent_conn1.send('苹果')
# 接收管道2中的消息
received_message = self.child_conn2.recv()
print(f'[进程1] 收到的消息是:{received_message}')
# 关闭管道1
self.parent_conn1.close()
def proc2(self):
# 接收管道1中的消息
received_message = self.child_conn1.recv()
print(f'[进程2] 收到的消息是:{received_message}')
# 向管道2中发送消息
self.parent_conn2.send('香蕉')
# 关闭管道2
self.parent_conn2.close()
if __name__ == '__main__':
mc = myclass()
# 创建子进程
p1 = process(target=mc.proc1)
p2 = process(target=mc.proc2)
# 启动子进程
p1.start()
p2.start()
# 等待子进程结束
p1.join()
p2.join()
print('=================== 结束 ===================')
说明:
进程1向管道1中发送一条消息(苹果),发送完成之后接收管道2消息,如果没有则等待。
进程2接收管道1的消息并输出,再向管道2中发送一条消息(香蕉)。

4. 进程池
4.1. 同步与异步并发区别
- 同步(阻塞):同步并发类似于串行。例如a、b两个进程,必须a执行完成后才能执行b;若a出现意外(一直阻塞),那么b将无法执行。
- 异步(非阻塞):异步并发在执行多任务时不会出现相互阻塞的情况(进程池限制除外)。如有a、b两个进程,他们俩可以同时进行,不会出现相互等待的情况。
同步的简单代码如下
from multiprocessing import pool
from time import sleep
def proc1():
'''子进程1'''
for i in range(2):
print(f'[进程1] 执行{i}...')
sleep(1)
def proc2():
'''子进程2'''
for i in range(2):
print(f'[进程2] 执行{i}...')
sleep(1)
if __name__ == '__main__':
# 定义进程池
pool = pool()
# 同步执行2个子进程
pool.apply(proc1)
pool.apply(proc2)
# 关闭和等待子进程结束
pool.close()
pool.join()使用pool.apply ([函数名]) 定义同步执行:先执行 proc1,再执行 proc2

使用异步 pool.apply_async ([函数名])(将上述代码 apply 修改为 apply_async 即可)
# 异步执行2个子进程
pool.apply_async (proc1)
pool.apply_async (proc2)其结果为:proc1 和 proc2 同时执行

4.2. 异步调用多个子进程
进程池异步并发的基本使用方法
'''定义子进程'''
def start_func():
pass
def proc1():
pass
def proc2():
pass
if __name__ == '__main__':
# 定义进程池,设置属性
pool = pool(processes=1, maxtasksperchild=2, initializer=start_func)
# 异步启动子进程(子进程为指定的某个函数)
pool.apply_async(proc1)
pool.apply_async(proc2)
# 关闭和等待子进程结束
pool.close()
pool.join()- processes:并发数(默认为系统最大cpu数)。如果将并发数设置为1,即使进程池中有2个进程,也会按同步的方式执行。一般设置不超过cpu数,最好等于子进程数。
- maxtasksperchild:可以限制在处理完一定数量的任务后重建进程池中的工作进程,以避免内存泄漏和资源泄漏等问题。使用
maxtasksperchild参数时需要权衡性能和资源利用率之间的平衡。如果你将maxtasksperchild设置得太小,进程重建的成本可能会显著影响性能。如果你将maxtasksperchild设置得太大,那么可能会导致内存泄漏或资源泄漏。需要根据具体情况进行调整。 - initializer:指定每个工作进程启动时要调用的函数(func)。如果子进程只有2个,而processes 设置为3,那么将会调用3次 func。
进程池中有4个子进程,仅使用2个并发(a、b、c、d,先执行ab,再执行cd)
from multiprocessing import pool
from time import sleep
def start_func():
print('启动前此函数')
def proc1():
'''子进程'''
for i in range(2):
print(f'[进程1] 执行{i}...')
sleep(1)
def proc2():
'''子进程'''
for i in range(2):
print(f'[进程2] 执行{i}...')
sleep(1)
def proc3():
'''子进程'''
for i in range(2):
print(f'[进程3] 执行{i}...')
sleep(1)
def proc4():
'''子进程'''
for i in range(2):
print(f'[进程4] 执行{i}...')
sleep(1)
if __name__ == '__main__':
# 定义进程池,设置属性
pool = pool(processes=2, initializer=start_func)
func_list = [proc1, proc2, proc3, proc4]
for i in func_list:
pool.apply_async(i)
# 关闭和等待子进程结束
pool.close()
pool.join()结果如下:

如果将processes 设置为 4,那么4个进程将会同时进行

4.3. 进程池的高并发
- 使用 map 或 imap 迭代来高效完成工作
使用 map(同步) 执行高并发。会自动等待子进程完成后,才会进行执行主进程。
from multiprocessing import pool
def proc(num):
print(f'我是子进程')
if __name__ == '__main__':
# 定义线程池,设置并发数为5
pool = pool(5)
# 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
pool.map(proc, range(5))
# 关闭进程池
pool.close()
print('========== 结束 ==========')
使用 imap(异步) 执行高并发。调度子进程运行后不会等待,继续执行主进程任务。若主进程运行结束,则子进程不论是否运行完成都将强制结束。
from multiprocessing import pool
def proc(num):
print(f'我是子进程')
if __name__ == '__main__':
# 定义线程池,设置并发数为5
pool = pool(5)
# 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
pool.imap(proc, range(5))
# 关闭进程池
pool.close()
print('========== 结束 ==========')
这样的好处是不会影响主进程其他任务的执行,如果需要等待则使用 join
from multiprocessing import pool
def proc(num):
print(f'我是子进程')
if __name__ == '__main__':
# 定义线程池,设置并发数为5
pool = pool(5)
# 使用map向函数传入多个参数(每传入一个参数则会调用一次函数,同时调用的数量由进程数决定)
pool.imap(proc, range(5))
# 关闭进程池
pool.close()
# 等待子进程结束
pool.join()
print('========== 结束 ==========')
使用高并发需要注意以下几点:
- 注意控制进程池数量,避免过多的进程导致性能下降。可以通过调用
multiprocessing.cpu_count()获取当前系统的 cpu 数量,并使用该值来指定进程池数量。 - 使用
maxtasksperchild参数来限制每个工作进程可以处理的任务数量。这有助于避免长时间运行的进程引起的资源泄漏或内存泄漏。 - 在提交任务之前,尽可能地减小要处理数据的大小。例如,可以使用迭代器来代替列表,或者使用生成器来延迟计算。
- 对于长时间运行的任务,可以使用消息队列来缓解性能问题。例如,可以使用 rabbitmq 或 zeromq 等消息队列来异步处理任务。
5. 进程同步
进程同步是指多个进程在共享资源时的协调与同步。单个进程运行时 ,程序的执行是顺序的;多个进程并发执行时可能会造成资源竞争、死锁等问题。因此,进程同步是保证每个进程在使用共享资源时的顺序和正确性。
5.1. 进程加锁的方式
两个进程间实现同步有2种方式,1、手动加锁(释放锁);2、with自动加锁(释放锁)
手动加锁方式
from multiprocessing import lock
def func():
# 手动加锁
lock().acquire()
'''执行程序'''
pass
# 手动释放锁
lock().release()with 自动加锁(释放锁)
from multiprocessing import lock
def func():
# with自动加锁(释放锁)
with lock():
'''执行程序'''
pass5.2. 实现进程同步
- 多进程之间同步全局变量的修改,则需要使用共享内存。需要用到 multiprocessing 模块中的 value 共享变量类型来实现。
实现逻辑

代码如下
from multiprocessing import process, lock, value
from time import sleep
def func1(lock, shared_var):
for i in range(3):
with lock:
shared_var.value -= 1
print(f'[func1] 共享变量var当前值为:{shared_var.value}')
sleep(1)
def func2(lock, shared_var):
for i in range(3):
with lock:
shared_var.value += 2
print(f'[func2] 共享变量var当前值为:{shared_var.value}')
sleep(1)
if __name__ == '__main__':
lock = lock()
shared_var = value('i', 10)
# 定义子进程
p1 = process(target=func1, args=(lock, shared_var))
p2 = process(target=func2, args=(lock, shared_var))
# 启动子进程
p1.start()
p2.start()
# 等待子进程结束
p1.join()
p2.join()
print(f'最终共享变量var为:{shared_var.value}')
错误示例(使用了多个 lock() )
from multiprocessing import process, lock, value
from time import sleep
def func1(shared_var):
'''定义子进程,让共享变量-1,执行3次'''
for i in range(3):
with lock(): #错误语句
shared_var.value -= 1
print(f'[func1] 共享变量var当前值为:{shared_var.value}')
sleep(1)
def func2(shared_var):
'''定义子进程,让共享变量+2,执行3次'''
for i in range(3):
with lock(): #错误语句
shared_var.value += 2
print(f'[func2] 共享变量var当前值为:{shared_var.value}')
sleep(1)
if __name__ == '__main__':
shared_var = value('i', 10)
# 定义子进程
p1 = process(target=func1, args=(shared_var,))
p2 = process(target=func2, args=(shared_var,))
# 启动子进程
p1.start()
p2.start()
# 等待子进程结束
p1.join()
p2.join()
print(f'最终共享变量var为:{shared_var.value}')按同步逻辑执行的过程应该是:共享变量 = 10(初始) - 1 + 2 - 1 + 2 - 1 + 2 = 13,但实际的值却是11。下图所示,其中一个步骤出错,并没有加锁。

仔细翻看代码,发现在每个函数中都使用了 lock() 方法,也就是说2个函数使用的是2个锁,自然不会加锁,出现了脏读现象。

- lock():用于实现在进程间同步访问共享资源,保证同一时间只有一个进程能够访问。
- rlock():与lock类似,但支持递归锁,同一个进程中多次获取仍然有效。
- semaphore():用于控制进程间的并发数量。
- event():用于实现进程间的事件通知。
- condition():用于实现复杂的进程间同步,比如等待某个条件变为真时再继续执行。
- barrier():用于实现多个进程间的协调操作,比如等待所有进程都到达某个状态再继续执行。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论