简介
消息队列是一种应用间的通讯方式,消息发送后可以立即放回,由消息系统来确保消息的可靠传递。消息发布者只需要将消息发布到消息队列中,而不需要管谁来取。消息使用者只管从消息队列中取消息而不管谁发布的。这样发布者和使用者都不同知道对方的存在。
消息队列普遍使用在生产者和消费者模型中。
- 优点
- 应用解耦: 应用之间不用那么多的同步调用,发消息到消息队列就行,消费者可以自己消费,消费生产者不用管了,降低应用之间的耦合。
- 降低延时: 应用之间用同步调用,需要等待对方响应,等待时间比较长,用消息之后,发送消息到消息队列就行,应用就可以返回了,对客户来讲降低了应用延时。
- 削峰填谷:请求比较多的时候,应用处理不过来,会丢弃请求;请求比较少时,应用不饱和。
请求比较多时,把请求放到消息队列,消费者按特定处理速度来处理,请求少时,也让应用有事情可以做;能做到忙时不丢请求,闲时不闲置应用资源。
主流的消息队列:kafka、activemq、rabbitmq、rocketmq
下面使用c++实现一个简单的消息队列。
具体实现
- 消息队列类
里面主要包含一个数组和一个队列,都保存消息。
应用从数组中拿消息处理,当数组满的时候,消息保存到队列中,当数组中消息处理完,从队列中取消息,再处理。
数组实现的是一个环形数组,记录下消息写游标和读游标,超过数组大小,对数组大小取余。
//消息长度和消息
typedef std::pair<size_t, char*> msgpair;
#define msgqueuesize 102400
class zmsgqueue
{
//消息对,first表示是否存放消息
typedef std::pair<bool, msgpair> msgqueue;
public:
zmsgqueue();
~zmsgqueue();
void* msgmalloc(const size_t len);
void msgfree(void* p);
//获得一个消息
msgpair* get();
//放入一个消息
bool put(const void* msg, size_t msglen);
//将队列中的消息放到消息数组中
bool putmsgqueue2arr();
//删除一个消息
void erase();
bool empty();
bool msgarrempty();
private:
void clear();
// 保存正在处理的消息
msgqueue msgarr_[msgqueuesize];
// 保存等待处理的消息
std::queue<msgpair> msgqueue_;
//消息写游标
size_t queuewrite_;
//消息读游标
size_t queueread_;
};
实现:
zmsgqueue::zmsgqueue()
{
bzero(msgarr_, sizeof(msgarr_));
queuewrite_ = 0;
queueread_ = 0;
}
zmsgqueue::~zmsgqueue()
{
clear();
}
void* zmsgqueue::msgmalloc(const size_t len)
{
char* p = (char*)malloc(len + 1);
return (void*)(p + 1);
}
void zmsgqueue::msgfree(void* p)
{
free((char*)p - 1);
}
//获得一个消息
msgpair* zmsgqueue::get()
{
if(queueread_ >= msgqueuesize)
return null;
if(msgarrempty())
putmsgqueue2arr();
msgpair* ret = null;
if(msgarr_[queueread_].first)
ret = &msgarr_[queueread_].second;
return ret;
}
//放入一个消息
bool zmsgqueue::put(const void* msg, size_t msglen)
{
char* buf = (char*)msgmalloc(msglen);
if(buf)
{
bcopy(msg, buf, msglen);
//先将队列中的消息放到数组中
//数组中还有位置直接放到数组中
//没有位置放到队列中
if(!putmsgqueue2arr() && !msgarr_[queuewrite_].first)
{
msgarr_[queuewrite_].first = true;
msgarr_[queuewrite_].second.first = msglen;
msgarr_[queuewrite_].second.second = buf;
queuewrite_++;
queuewrite_ %= msgqueuesize;
}
else
{
msgqueue_.push(std::make_pair(msglen, buf));
}
return true;
}
return false;
}
//将队列中的消息放到消息数组中
bool zmsgqueue::putmsgqueue2arr()
{
bool isleft = false;
while(!msgqueue_.empty())
{
if(!msgarr_[queuewrite_].first)
{
msgarr_[queuewrite_].first = true;
msgarr_[queuewrite_].second = msgqueue_.front();
queuewrite_++;
queuewrite_ %= msgqueuesize;
msgqueue_.pop();
}
else
{
isleft = true;
break;
}
}
return isleft;
}
//删除一个消息
void zmsgqueue::erase()
{
if(!msgarr_[queueread_].first)
return;
msgfree(msgarr_[queueread_].second.second);
msgarr_[queueread_].second.second = null;
msgarr_[queueread_].second.first = 0;
msgarr_[queueread_].first = false;
queueread_++;
queueread_ %= msgqueuesize;
}
void zmsgqueue::clear()
{
//队列中还有消息
while(putmsgqueue2arr())
{
//数组中还有消息
while(get())
{
erase();
}
}
//数组中还有消息
while(get())
{
erase();
}
}
bool zmsgqueue::empty()
{
if(putmsgqueue2arr()) return false;
return msgarrempty();
}
bool zmsgqueue::msgarrempty()
{
if(queueread_ == queuewrite_ && !msgarr_[queueread_].first)
{
return true;
}
return false;
}
- 消息队列的封装
对消息队列的封装主要是为了对消息进行解析和处理。
消息解析和处理函数定义成了虚函数,当需要使用消息队列并处理消息时,只需要继承消息队列,然后重写虚函数,进行对应处理即可。
类中还使用到了读写锁,当多线程的情况下,消息队列是一个临界资源,线程共享,需要进行上锁。单线程的情况下不需要加锁。
//t表示使用的消息队列
//msgt表示消息的类型,有的需要消息头,消息正文等,需要解析,这里是直接使用
template<class t=zmsgqueue, class msgt=char>
class messagequeue : public rwlocker
{
public:
messagequeue()
{}
~messagequeue()
{}
bool putmsg(const msgt* msg, const size_t msglen)
{
rwlocker::wlock();
msgqueue_.put(msg, msglen);
rwlocker::unlock();
return true;
}
//解析消息,处理消息
virtual bool msgparse(const msgt* msg, const size_t msglen) = 0;
//获取消息,解析消息,处理消息
bool docmd()
{
rwlocker::wlock();
msgpair* msg = msgqueue_.get();
while(msg)
{
msgparse(msg->second, msg->first);
msgqueue_.erase();
msg = msgqueue_.get();
}
rwlocker::unlock();
return true;
}
bool empty()
{
return msgqueue_.empty();
}
private:
t msgqueue_;
};
- 读写锁的封装
读写锁:可以多个线程进行读,只能一个线程进行写。写时独享资源,读时共享资源。写锁的优先级高。 - 为什么读写锁需要读锁?
为了防止其他线程请求写锁。一个线程请求了读锁,其他线程在请求写锁会阻塞,但是请求读锁不会阻塞。一个线程请求了写锁,其他线程请求读锁和写锁都会阻塞。
#include <pthread.h>
class rwlock
{
public:
rwlock()
{
pthread_rwlock_init(&rwlc_, null);
}
~rwlock()
{
pthread_rwlock_destroy(&rwlc_);
}
void rlock()
{
pthread_rwlock_rdlock(&rwlc_);
}
void wlock()
{
pthread_rwlock_wrlock(&rwlc_);
}
void unlock()
{
pthread_rwlock_unlock(&rwlc_);
}
private:
pthread_rwlock_t rwlc_;
};
class rwlocker
{
public:
void rlock()
{
rwlc_.rlock();
}
void wlock()
{
rwlc_.wlock();
}
void unlock()
{
rwlc_.unlock();
}
private:
rwlock rwlc_;
};
- makefile:
# ini1=main.cpp # in2=messagequeue.cpp out=main cc=g++ std=-std=c++11 -lpthread #$(out):$(in1) $(in2) $(out): main.cpp messagequeue.cpp rwlock.h $(cc) $^ -o $@ $(std) .phony:clean clean: rm -rf $(out)
- 代码测试
实现一个类继承消息队列,重写消息处理函数。
定义对象,调用docmd函数即可。
#include "messagequeue.h"
class test : public messagequeue<>
{
bool msgparse(const char* msg, const size_t msglen)
{
std::cout << msglen << ":" << msg << std::endl;
return true;
}
};
int main()
{
//模拟客户端发送消息
char buf[256] = "hello world!";
test t;
//消息队列放消息
t.putmsg(buf, strlen(buf));
//处理消息
t.docmd();
return 0;
}

到此这篇关于c++简单实现消息队列的示例代码的文章就介绍到这了,更多相关c++ 消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论