一、posix信号量
1、概述
在我们进行环形队列的生产消费者模型的学习之前,我们要对前置条件posix信号量进行学习,这里的posix的信号量与systemv的信号量是几乎一致的,都是用于同步操作,达到无冲突的访问共享资源的目的,只是posix信号量的使用要更简单一些,可以用于线程间同步
信号量的本质就是一个计数器,它的本质就是用来描述资源数目的,把资源是否就绪放到了临界区之外,在申请信号量的时候其实已经就是间接在做判断了
2、调用接口
(一)初始化信号量
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
- 返回值:成功返回0,失败返回-1
sem:指向要初始化的信号量对象的指针pshared:指定信号量的共享属性,如果pshared为 0,表示信号量是进程内共享的,只能在创建它的进程内的多个线程之间使用,如果pshared非 0,表示信号量可以在多个进程之间共享value:指定信号量的初始值,表示可以同时访问共享资源的线程或进程的数量
(二)销毁信号量
#include <semaphore.h> int sem_destroy(sem_t *sem);
- 返回值:成功返回0,失败返回-1
sem:指向要销毁的信号量对象的指针
(三)等待信号量
#include <semaphore.h> int sem_wait(sem_t *sem);
- 返回值:成功返回0,失败返回-1
sem:指向要操作的信号量对象的指针,这个指针一定要是被初始化过的
sem_wait 函数执行的是信号量的 p 操作
- 如果信号量
sem的值大于 0,sem_wait会将信号量的值减 1,然后立即返回,调用线程或进程可以继续执行后续代码,意味着该线程或进程成功获取了对共享资源的访问权 - 如果信号量
sem的值等于 0,sem_wait会使调用线程或进程进入阻塞状态,直到信号量的值大于 0 为止。一旦信号量的值变为大于 0,sem_wait会将信号量的值减 1 并返回,线程或进程继续执行
(四)发布信号量
#include <semaphore.h> int sem_post(sem_t *sem);
- 返回值:成功返回0,失败返回-1
sem:指向要操作的信号量对象的指针,这个指针一定要是被初始化过的
sem_post 函数执行的是信号量的 v 操作,会将信号量 sem 的值加 1
- 如果在调用
sem_post之前,有其他线程或进程因为调用sem_wait而阻塞在该信号量上(即信号量的值为 0),那么在信号量的值加 1 之后,系统会唤醒其中一个阻塞的线程或进程,被唤醒的线程或进程会将信号量的值再减 1 并继续执行后续代码
3、在环形队列中的作用
我们在之前应该都接触过环形队列,在环形队列中,一般我们是需要一个计数器的,或者在环形队列中留出最后一个位置,因为如果没有这些措施,我们就不知道双指针谁在前谁在后了,我们这里使用信号量替代了这个计数器
二、基于环形队列的生产消费者模型
1、理论探究

我们通过数组以及模运算的方式来模拟环状模型,前面的基于阻塞队列的生产消费者模型底层来说是基于容器queue的,其空间可以动态分配,现在是基于固定大小的,基于容器vector
其中生产者关注的是环形队列的空间资源,消费者关心的是环形队列的数据资源,而环形队列中的空间资源+数据资源=全部资源,只要有空间生产者就可以生产数据然后放入,只要有数据消费者就可以取出数据然后加工
2、代码实现
(一)ringqueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
//环形队列默认容量
const static int defaultcap = 8;
//环形队列核心接口:pv操作以及加锁解锁
template<class t>
class ringqueue{
private:
void p(sem_t &sem)
{
sem_wait(&sem);
}
void v(sem_t &sem)
{
sem_post(&sem);
}
void lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
//初始化
ringqueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
//生产者消费者的锁
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void push(const t &in) // 生产活动
{
//调用p函数检查队列中是否有可用空间,没有可用空间线程会阻塞
p(pspace_sem_);
//这里为什么要先p后加锁,下面详谈
lock(p_mutex_);
ringqueue_[p_step_] = in;
// 位置后移,维持环形特性
p_step_++;
p_step_ %= cap_;
unlock(p_mutex_);
v(cdata_sem_);
}
void pop(t *out) // 消费活动
{
p(cdata_sem_);
lock(c_mutex_);
*out = ringqueue_[c_step_];
// 位置后移,维持环形特性
c_step_++;
c_step_ %= cap_;
unlock(c_mutex_);
v(pspace_sem_);
}
//析构销毁
~ringqueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<t> ringqueue_;// 环形队列的底层实现
int cap_; // 队列容量
int c_step_; // 消费者下标
int p_step_; // 生产者下标
sem_t cdata_sem_; // 队中可用数据资源
sem_t pspace_sem_; // 队中可用空间资源
pthread_mutex_t c_mutex_; // 消费者锁
pthread_mutex_t p_mutex_; // 生产者锁
};(二)task.hpp
任务函数还是上一次的任务
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
divzero=1,
modzero,
unknown
};
class task
{
public:
task()
{}
task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = divzero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = modzero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = unknown;
break;
}
}
void operator ()()
{
run();
}
std::string getresult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string gettask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};(三)main.cpp
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "ringqueue.hpp"
#include "task.hpp"
using namespace std;
//这个结构体是方便我们打印的时候查看方便的
struct threaddata
{
ringqueue<task> *rq; //环形队列
std::string threadname;//线程名字
};
void *productor(void *args)
{
threaddata *td = static_cast<threaddata*>(args);
ringqueue<task> *rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while (true)
{
// 模拟获取数据
int data1 = rand() % 10 + 1;
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
task t(data1, data2, op);
// 生产数据
rq->push(t);
cout << "productor task done, task is : " << t.gettask() << " who: " << name << endl;
sleep(1);
}
return nullptr;
}
void *consumer(void *args)
{
threaddata *td = static_cast<threaddata*>(args);
ringqueue<task> *rq = td->rq;
std::string name = td->threadname;
while (true)
{
// 消费数据
task t;
rq->pop(&t);
// 处理数据
t();
cout << "consumer get task, task is : " << t.gettask() << " who: " << name << " result: " << t.getresult() << endl;
}
return nullptr;
}
int main()
{
srand(time(nullptr));
ringqueue<task> *rq = new ringqueue<task>(10);
pthread_t c[5], p[3];
//这里我们为了方便查看,统一用单生产单消费
for (int i = 0; i < 1; i++)
{
threaddata *td = new threaddata();
td->rq = rq;
td->threadname = "productor-" + std::to_string(i);
pthread_create(p + i, nullptr, productor, td);
}
for (int i = 0; i < 1; i++)
{
threaddata *td = new threaddata();
td->rq = rq;
td->threadname = "consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, consumer, td);
}
for (int i = 0; i < 1; i++)
{
pthread_join(p[i], nullptr);
}
for (int i = 0; i < 1; i++)
{
pthread_join(c[i], nullptr);
}
return 0;
}
3、pv操作包裹住加解锁操作的原因
在 pop和push 函数中,以push 函数为例,p(pspace_sem_) 和 v(cdata_sem_) 包裹着 lock(p_mutex_) 和 unlock(p_mutex_) 这种设计是为了实现更细粒度的同步控制,尽可能减少锁的竞争,以确保线程安全和高效性,下面详细解释其原因:
p(pspace_sem_) 在 lock(p_mutex_) 之前:
- 信号量的作用:
pspace_sem_信号量用于表示环形队列中可用的空间资源,p(pspace_sem_)操作会检查信号量的值,如果值大于 0,则将其减 1 并继续执行,如果值为 0,则线程会阻塞,直到有可用空间(即其他线程调用v(pspace_sem_)释放空间) - 避免不必要的加锁:在尝试获取互斥锁之前先检查信号量,可以避免在没有可用空间时加锁,因为如果没有可用空间,即使加了锁也无法进行生产操作,还会导致其他线程无法释放空间,造成资源浪费和性能下降,通过先检查信号量,只有在有可用空间时才去获取互斥锁,减少了锁的竞争,提高了程序的效率
v(cdata_sem_) 在 unlock(p_mutex_) 之后:
- 信号量的通知机制:
cdata_sem_信号量用于表示环形队列中可用的数据资源,v(cdata_sem_)操作会将信号量的值加 1,如果有消费者线程因为等待数据而阻塞,会唤醒其中一个线程 - 避免死锁和数据不一致:在释放互斥锁之后再增加
cdata_sem_信号量的值,可以确保在通知消费者有新数据可用之前,生产者已经完成了对共享资源的修改,并且释放了锁,如果在加锁状态下就增加信号量,可能会导致消费者线程被唤醒后尝试获取锁,但由于生产者还持有锁而无法进入临界区,从而造成死锁或数据不一致的问题
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论