一、什么是生产消费者模型
生产消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题,生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接交给阻塞队列,消费者不找生产者索要数据,而是直接从阻塞队列中去取,这样一来,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
对于生产消费者模型,我们有一个321规则,分别是3种关系,2种角色,1个交易场所
- 三种关系:生产者和生产者的互斥竞争关系,消费者和消费者的互斥竞争关系,生产者和消费者的互斥、同步关系
- 两种角色:生产者和消费者
- 一个交易场所:特定结构的内存空间(如阻塞队列)
二、基于阻塞队列的生产消费者模型
1、理论研究
在多线程编程中,阻塞队列是一种常用于实现生产者和消费者模型的数据结构,其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中再次被放入元素,当队列满时,往队列中存放元素的操作也会被阻塞,直到有元素从队列中被获取
生产消费者模型最大的好处是,也是生产消费者模型效率高的原因是:在消费者获取数据(一般是网络数据)的时候,生产者可以生产数据,生产者放入数据的时候,消费者可以处理数据,虽然特定内存结构,也就是临界资源区是有锁的,只能由单线程通过,只要将时间合理化,我们就可以实现生产者和消费者的高效率工作,并且将发送数据的线程和处理数据的线程解耦合
2、多生产多消费模型
(一)blockqueue.hpp
#pragma once #include <iostream> #include <queue> #include <pthread.h> //定义一个模版类,方便我们使用任何类型进行生产消费 template <class t> //定义一个阻塞队列 class blockqueue { //队列默认最大容量 static const int defalutnum = 20; public: blockqueue(int maxcap = defalutnum) : maxcap_(maxcap) { //初始化互斥锁和生产者和消费者的条件变量 pthread_mutex_init(&mutex_, nullptr); pthread_cond_init(&c_cond_, nullptr); pthread_cond_init(&p_cond_, nullptr); //下面注释掉的是设置水位线,设置最低最高水位线 //在阻塞队列中的数据,在低于最低水位线时是不可被获取的,只能写入 //在高于最高水位线时是不可被写入的,只能获取 // low_water_ = maxcap_/3; // high_water_ = (maxcap_*2)/3; } //从队列头取出元素返回 t pop() { pthread_mutex_lock(&mutex_);//加锁 //只能用while不能用if,原因是会出现误唤醒问题,下面说 while (q_.size() == 0) { pthread_cond_wait(&c_cond_, &mutex_); } t out = q_.front(); q_.pop(); //这里是加了水位线的版本,在低于水位线的时候要唤醒生产者 // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_); pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_);//解锁 return out; } void push(const t &in) { pthread_mutex_lock(&mutex_);//加锁 //同pop函数 while (q_.size() == maxcap_) { pthread_cond_wait(&p_cond_, &mutex_); } q_.push(in); //这里是加了水位线的版本,在高于水位线的时候要唤醒消费者 // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_); pthread_cond_signal(&c_cond_); pthread_mutex_unlock(&mutex_);//解锁 } //析构函数 ~blockqueue() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&c_cond_); pthread_cond_destroy(&p_cond_); } private: std::queue<t> q_; int maxcap_; // 极大值 pthread_mutex_t mutex_; pthread_cond_t c_cond_; pthread_cond_t p_cond_; //最低最高水位线 // int low_water_; // int high_water_; };
(二)task.hpp
#pragma once #include <iostream> #include <string> //定义运算方法 std::string opers = "+-*/%"; //枚举错误 enum { divzero = 1, modzero, unknown }; class task { public: task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0) {} void run() { switch (oper_) { case '+': result_ = data1_ + data2_; break; case '-': result_ = data1_ - data2_; break; case '*': result_ = data1_ * data2_; break; case '/': { if (data2_ == 0) exitcode_ = divzero; else result_ = data1_ / data2_; } break; case '%': { if (data2_ == 0) exitcode_ = modzero; else result_ = data1_ % data2_; } break; default: exitcode_ = unknown; break; } } //伪函数,通过重载()使run可以像函数一样调用 void operator()() { run(); } //返回的运算结果以及错误代码 std::string getresult() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "="; r += std::to_string(result_); r += "[code: "; r += std::to_string(exitcode_); r += "]"; return r; } //返回运算表达式 std::string gettask() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "=?"; return r; } ~task() {} private: int data1_; int data2_; char oper_; int result_; int exitcode_; };
(三)main.cpp
#include "blockqueue.hpp" #include "task.hpp" #include <unistd.h> #include <ctime> void *consumer(void *args) { blockqueue<task> *bq = static_cast<blockqueue<task> *>(args); while (true) { // 消费 task t = bq->pop(); // 计算 t(); //模拟消费者处理任务 std::cout << "处理任务: " << t.gettask() << " 运算结果是: " << t.getresult() << " thread id: " << pthread_self() << std::endl; } } void *productor(void *args) { int len = opers.size(); blockqueue<task> *bq = static_cast<blockqueue<task> *>(args); int x = 10; int y = 20; while (true) { // 用随机数运算模拟生产者生产数据 int data1 = rand() % 10 + 1; // [1,10] usleep(10); int data2 = rand() % 10; char op = opers[rand() % len]; task t(data1, data2, op); // 生产 bq->push(t); std::cout << "生产了一个任务: " << t.gettask() << " thread id: " << pthread_self() << std::endl; sleep(1); } } int main() { //随机数种子 srand(time(nullptr)); //给阻塞队列传一个任务 blockqueue<task> *bq = new blockqueue<task>(); //多生产者多消费者 pthread_t c[3], p[5]; for (int i = 0; i < 3; i++) { pthread_create(c + i, nullptr, consumer, bq); } for (int i = 0; i < 5; i++) { pthread_create(p + i, nullptr, productor, bq); } for (int i = 0; i < 3; i++) { pthread_join(c[i], nullptr); } for (int i = 0; i < 5; i++) { pthread_join(p[i], nullptr); } delete bq; return 0; }
3、误唤醒问题
误唤醒问题就是在调用pop
函数或者push
函数的时候可能会引起的,下面我们再把代码贴出来,然后把上面有过的注释去掉
//... t pop() { pthread_mutex_lock(&mutex_); while (q_.size() == 0) //不能调用if而要用while { pthread_cond_wait(&c_cond_, &mutex_); } t out = q_.front(); q_.pop(); pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_); return out; } void push(const t &in) { pthread_mutex_lock(&mutex_); while (q_.size() == maxcap_) { pthread_cond_wait(&p_cond_, &mutex_); } q_.push(in); pthread_cond_signal(&c_cond_); pthread_mutex_unlock(&mutex_); //...
在多生产者 - 多消费者并发编程场景中,误唤醒现象较为常见,假定队列当前处于满状态,当一个消费者线程成功消费一个数据后,队列中会空出一个位置,随后,线程可能多次调用pthread_cond_signal(&p_cond_)
函数,唤醒了一批正在 p_cond_
条件变量下等待的生产者线程,由于被唤醒的生产者线程需要重新竞争互斥锁,这些线程之间呈现出互斥关系,在先前执行消费操作的线程释放锁之后,仅有一个生产者线程能够成功获取锁,其余虽被唤醒但未能抢到锁的生产者线程只能在锁处等待
当成功获取锁的生产者线程完成数据生产操作后,队列可能再次达到满状态,此时,该线程会调用 pthread_cond_signal(&c_cond_)
函数唤醒一个消费者线程,随后释放锁,在此情形下,被唤醒的线程不仅包括刚刚被唤醒的消费者线程,还涵盖之前被唤醒却未抢到锁的生产者线程,它们会同时参与锁的竞争,若使用 if
语句来判断队列是否已满,当某个生产者线程抢到锁后,可能不会再次对队列状态进行检查,直接尝试向已满的队列中添加数据,从而引发错误
因此,为确保线程安全,应使用 while
循环来包裹 pthread_cond_wait
函数,当一个线程被唤醒并成功获取锁后,不应直接执行队列操作(无论是生产数据还是消费数据),而应再次检查资源是否满足操作条件,若资源就绪,则可继续执行队列操作;若资源未就绪,则应再次调用 pthread_cond_wait
函数,使线程进入休眠状态,等待后续唤醒
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论