当前位置: 代码网 > it编程>前端脚本>Python > Python 多线程通信的常用方法汇总

Python 多线程通信的常用方法汇总

2026年01月14日 Python 我要评论
一般来说,大部分遇到的多线程,只要能各自完成好各自的任务即可。少数情况下,不同线程可能需要在线程安全的情况下,进行通信和数据交换。python 中常用的线程通信有以下方法。共享变量共享变量是最简单的线

一般来说,大部分遇到的多线程,只要能各自完成好各自的任务即可。少数情况下,不同线程可能需要在线程安全的情况下,进行通信和数据交换。python 中常用的线程通信有以下方法。

共享变量

共享变量是最简单的线程通信方式,比如进行计数更新等操作,但需要配合锁来保证线程安全。

import threading
# 共享变量
shared_data = 0
lock = threading.lock()
def worker():
    global shared_data
    with lock:  # 使用锁保证线程安全
        shared_data += 1
threads = []
for i in range(5):
    t = threading.thread(target=worker)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终结果: {shared_data}")  # 应该是5

queue队列

最常用的线程安全通信方式,是生产者-消费者模式的理想选择。比如使用优先级队列优先消费高优先级的数据(序号越低,优先级越高,越优先消费)。

from time import sleep
from random import random, randint
from threading import thread
from queue import priorityqueue
queue = priorityqueue()
def producer(queue):
    print('producer: running')
    for i in range(5):
        # create item with priority
        value = random()
        priority = randint(0, 5)
        item = (priority, value)
        queue.put(item)
    # wait for all items to be processed
    queue.join()
    queue.put(none)
    print('producer: done')
def consumer(queue):
    print('consumer: running')
    while true:
        # get a unit of work
        item = queue.get()
        if item is none:
            break
        sleep(item[1])
        print(item)
        queue.task_done()
    print('consumer: done')
producer = thread(target=producer, args=(queue,))
producer.start()
consumer = thread(target=consumer, args=(queue,))
consumer.start()
producer.join()
consumer.join()
producer: running
consumer: running
(0, 0.9945246262101098)
(2, 0.35853829355476663)
(2, 0.4794139132317813)
(3, 0.8460111545035349)
(5, 0.6047655828611674)
producer: done
consumer: done

event事件

线程模提供了 event 用于线程间的简单信号传递。event 对象管理内部标志的状态。标志初始为false,并通过 set() 方法变为 true,通过 clear() 方法重新设置为 false。wait() 方法会阻塞,直到标志变为 true。

比如下面使用 event 通知,模拟交通信号灯周期性变化及车辆通行之间的协同运行。车辆根据信号灯的状态判断是否通行还是等待;车辆通行完毕以后,只剩下信号灯周期性变化。

from threading import *
import time
def signal_state():
    while true:
        time.sleep(5)
        print("traffic police giving green signal")
        event.set()
        time.sleep(10)
        print("traffic police giving red signal")
        event.clear()
def traffic_flow():
    num = 0
    while num < 10:
        print("waiting for green signal")
        event.wait()
        print("green signal ... traffic can move")
        while event.is_set():
            num = num + 1
            print("vehicle no:", num, " crossing the signal")
            time.sleep(2)
        print("red signal ... traffic has to wait")
event = event()
t1 = thread(target=signal_state)
t2 = thread(target=traffic_flow)
t1.start()
t2.start()
waiting for green signal
traffic police giving green signal
green signal ... traffic can move
vehicle no: 1  crossing the signal
vehicle no: 2  crossing the signal
vehicle no: 3  crossing the signal
vehicle no: 4  crossing the signal
vehicle no: 5  crossing the signal
traffic police giving red signal
red signal ... traffic has to wait
waiting for green signal
traffic police giving green signal
green signal ... traffic can move
vehicle no: 6  crossing the signal
vehicle no: 7  crossing the signal
vehicle no: 8  crossing the signal
vehicle no: 9  crossing the signal
vehicle no: 10  crossing the signal
traffic police giving red signal
red signal ... traffic has to wait
traffic police giving green signal
traffic police giving red signal
traffic police giving green signal
traffic police giving red signal
...

condition条件对象

线程模块中的 condition 类实现了条件变量对象。条件对象会强制一个或多个线程等待,直到被另一个线程通知。condition 用于更复杂的线程同步,允许线程等待特定条件。比如上面的 event 的实现,其内部也是在使用 condition。

import threading
import time
# 共享资源
buffer = []
max_items = 5
condition = threading.condition()
def producer():
    """生产者"""
    for i in range(10):
        time.sleep(0.2)
        with condition:
            while len(buffer) >= max_items:
                print("buffer full,wait...")
                condition.wait()  # 等待缓冲区有空位
            item = f"item-{i}"
            buffer.append(item)
            print(f"producer: {item}, buffer: {len(buffer)}")
            condition.notify_all()  # 通知消费者
def consumer():
    """消费者"""
    for i in range(10):
        time.sleep(0.8)
        with condition:
            while len(buffer) == 0:
                print("buffer empty,wait...")
                condition.wait()  # 等待缓冲区有数据
            item = buffer.pop(0)
            print(f"consumer: {item}, buffer: {len(buffer)}")
            condition.notify_all()  # 通知生产者
# 创建线程
prod = threading.thread(target=producer)
cons = threading.thread(target=consumer)
prod.start()
cons.start()
prod.join()
cons.join()
producer: item-0, buffer: 1
producer: item-1, buffer: 2
producer: item-2, buffer: 3
consumer: item-0, buffer: 2
producer: item-3, buffer: 3
producer: item-4, buffer: 4
producer: item-5, buffer: 5
buffer full,wait...
consumer: item-1, buffer: 4
producer: item-6, buffer: 5
buffer full,wait...
consumer: item-2, buffer: 4
producer: item-7, buffer: 5
buffer full,wait...
consumer: item-3, buffer: 4
producer: item-8, buffer: 5
buffer full,wait...
consumer: item-4, buffer: 4
producer: item-9, buffer: 5
consumer: item-5, buffer: 4
consumer: item-6, buffer: 3
consumer: item-7, buffer: 2
consumer: item-8, buffer: 1
consumer: item-9, buffer: 0

semaphore信号量

semaphore 信号量控制对共享资源的访问数量。信号量的基本概念是使用一个内部计数器,每个 acquire() 调用将其递减,每个 release() 调用将其递增。计数器永远不能低于零;当 acquire() 发现计数器为零时,它会阻塞,直到某个其他线程调用 release()。当然,从源码看,信号量也是通过 condition 条件对象来进行实现的。

import threading
import time
# 信号量,限制最多3个线程同时访问
semaphore = threading.semaphore(3)
def access_resource(thread_id):
    """访问共享资源"""
    with semaphore:
        print(f"thread {thread_id} acquire\n", end="")
        time.sleep(2)  # 模拟资源访问
        print(f"thread {thread_id} release\n", end="")
# 创建10个线程
threads = []
for i in range(10):
    t = threading.thread(target=access_resource, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
thread 0 acquire
thread 1 acquire
thread 2 acquire
thread 0 release
thread 3 acquire
thread 1 release
thread 2 release
thread 4 acquire
thread 5 acquire
thread 3 release
thread 6 acquire
thread 4 release
thread 7 acquire
thread 5 release
thread 8 acquire
thread 6 release
thread 9 acquire
thread 7 release
thread 8 release
thread 9 release

barrier屏障

barrier 使多个线程等待,直到指定数目的线程都到达某个点,这些线程才会被同时唤醒,然后继续往后执行(需要注意的是:如果没有设置 timeout,且总的线程数无法整除给定的线程数 parties 时,会导致线程阻塞,形成死锁)。

import threading
import time
# 创建屏障,等待3个线程(注意:如果总的线程数无法整除3,则会导致线程阻塞)
barrier = threading.barrier(3)
def worker(worker_id):
    """工作线程"""
    print(f"worker {worker_id} start")
    time.sleep(worker_id)  # 模拟不同工作速度
    print(f"worker {worker_id} arrive")
    barrier.wait()  # 等待所有线程到达
    print(f"worker {worker_id} continue")
# 创建3个线程
threads = []
for i in range(6):
    t = threading.thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
worker 0 start
worker 0 arrive
worker 1 start
worker 2 start
worker 3 start
worker 4 start
worker 5 start
worker 1 arrive
worker 2 arrive
worker 2 continue
worker 0 continue
worker 1 continue
worker 3 arrive
worker 4 arrive
worker 5 arrive
worker 5 continue
worker 3 continue
worker 4 continue

不管以什么样的方式进行线程通信,最重要的当属线程安全,线程通信的各种设计,也是建立在通过锁的机制保证线程安全的情况下来实现各种功能的。

到此这篇关于python 之多线程通信的几种常用方法的文章就介绍到这了,更多相关python多线程通信内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com