1.介绍
rabbitmq:消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)
- 核心概念:交换机、队列、绑定、消息
- 交换机类型:
- 广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中
- 直接交换:根据消息中的
bkey与绑定的rkey对比,一致则放入队列 - 主题交换:使用
bkey与绑定的rkey进行规则匹配,成功则放入队列
2.安装
1.rabbitmq
- 安装:
sudo apt install rabbitmq-server - 简单使用:
# 安装完成的时候默认有个用户guest,但是权限不够,要创建一个administrator用户,才可以做为远程登录和发表订阅消息 #添加用户 sudo rabbitmqctl add_user root <password> #设置用户tag sudo rabbitmqctl set_user_tags root administrator #设置用户权限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # rabbitmq自带了web管理界面, 执行下面命令开启, 默认端口15672 sudo rabbitmq-plugins enable rabbitmq_management
2.客户端库
sudo apt install libev-dev #libev 网络库组件 git clone https://github.com/copernicamarketingsoftware/amqp-cpp.git cd amqp-cpp/ make make install
如果安装时出现以下报错,则表示ssl版本出现问题
/usr/include/openssl/macros.h:147:4: error: #error
"openssl_api_compat expresses an impossible api compatibility
level"
147 | # error "openssl_api_compat expresses an impossible api
compatibility level"
| ^~~~~
in file included from /usr/include/openssl/ssl.h:18,
from linux_tcp/openssl.h:20,
from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor,
destructor, or type conversion before ‘deprecatedin_1_1_0'
687 | deprecatedin_1_1_0(int bio_get_port(const char *str,
unsigned short *port_ptr))解决方案:卸载当前的ssl库,重新进行修复安装
dpkg -l | grep ssl sudo dpkg -p --force-all libevent-openssl-2.1-7 sudo dpkg -p --force-all openssl sudo dpkg -p --force-all libssl-dev sudo apt --fix-broken install
3.amqp-cpp
简单使用
1.介绍
amqp-cpp是用于与rabbitmq消息中间件通信的c++库- 它能解析从
rabbitmq服务发送来的数据,也可以生成发向rabbitmq的数据包 amqp-cpp库不会向rabbitmq建立网络连接,所有的网络io由用户完成
- 它能解析从
amqp-cpp提供了可选的网络层接口,它预定义了tcp模块,用户就不用自己实现网络io,- 也可以选择
libevent、libev、libuv、asio等异步通信组件, 需要手动安装对应的组件
- 也可以选择
amqp-cpp完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中- 注意:它需要c++17的支持
2.使用
amqp-cpp的使用有两种模式:- 使用默认的
tcp模块进行网络通信 - 使用扩展的
libevent、libev、libuv、asio异步通信组件进行通信
- 使用默认的
- 此处以
libev为例,不需要自己实现monitor函数,可以直接使用amqp::libevhandler
4.类与接口
1.channel
channel是一个虚拟连接,一个连接上可以建立多个通道- 并且所有的
rabbitmq指令都是通过channel传输
- 并且所有的
- 所以连接建立后的第一步,就是建立
channel- 因为所有操作是异步的,所以在
channel上执行指令的返回值并不能作为操作执行结果
- 因为所有操作是异步的,所以在
- 实际上它返回的是
deferred类,可以使用它安装处理函数
namespace amqp
{
/**
* generic callbacks that are used by many deferred objects
*/
using successcallback = std::function<void()>;
using errorcallback = std::function<void(const char *message)>;
using finalizecallback = std::function<void()>;
/**
* declaring and deleting a queue
*/
using queuecallback = std::function<void(const std::string &name,
uint32_t messagecount,
uint32_t consumercount)>;
using deletecallback = std::function<void(uint32_t deletedmessages)>;
using messagecallback = std::function<void(const message &message,
uint64_t deliverytag,
bool redelivered)>;
// 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用ackcallback
using ackcallback = std::function<void(uint64_t deliverytag, bool multiple)>;
// 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调
using publishackcallback = std::function<void()>;
using publishnackcallback = std::function<void()>;
using publishlostcallback = std::function<void()>;
// 信道类
class channel
{
channel(connection *connection);
bool connected();
/**
*声明交换机
*如果提供了一个空名称,则服务器将分配一个名称。
*以下flags可用于交换机:
*
*-durable 持久化,重启后交换机依然有效
*-autodelete 删除所有连接的队列后,自动删除交换
*-passive 仅被动检查交换机是否存在
*-internal 创建内部交换
*
*@param name 交换机的名称
*@param-type 交换类型
enum exchangetype
{
fanout, 广播交换,绑定的队列都能拿到消息
direct, 直接交换,只将消息交给routingkey一致的队列
topic, 主题交换,将消息交给符合bindingkey规则的队列
headers,
consistent_hash,
message_deduplication
};
*@param flags 交换机标志
*@param arguments其他参数
*
*此函数返回一个延迟处理程序。可以安装回调
using onsuccess(), onerror() and onfinalize() methods.
*/
deferred &declareexchange(const std::string_view &name,
exchangetype type,
int flags,
const table &arguments);
/**
*声明队列
*如果不提供名称,服务器将分配一个名称。
*flags可以是以下值的组合:
*
*-durable 持久队列在代理重新启动后仍然有效
*-autodelete 当所有连接的使用者都离开时,自动删除队列
*-passive 仅被动检查队列是否存在
*-exclusive 队列仅存在于此连接,并且在连接断开时自动删除
*
*@param name 队列的名称
*@param flags 标志组合
*@param arguments 可选参数
*
*此函数返回一个延迟处理程序。可以安装回调
*使用onsuccess()、onerror()和onfinalize()方法。
*
deferred &onerror(const char *message)
*
*可以安装的onsuccess()回调应该具有以下签名:
void mycallback(const std::string &name,
uint32_t messagecount,
uint32_t consumercount);
例如:
channel.declarequeue("myqueue").onsuccess(
[](const std::string &name,
uint32_t messagecount,
uint32_t consumercount) {
std::cout << "queue '" << name << "' ";
std::cout << "has been declared with ";
std::cout << messagecount;
std::cout << " messages and ";
std::cout << consumercount;
std::cout << " consumers" << std::endl;
* });
*/
deferredqueue &declarequeue(const std::string_view &name,
int flags,
const table &arguments);
/**
*将队列绑定到交换机
*
*@param exchange 源交换机
*@param queue 目标队列
*@param routingkey 路由密钥
*@param arguments 其他绑定参数
*
*此函数返回一个延迟处理程序。可以安装回调
*使用onsuccess()、onerror()和onfinalize()方法。
*/
deferred &bindqueue(const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &routingkey,
const table &arguments);
/**
*将消息发布到exchange
*您必须提供交换机的名称和路由密钥。
然后,rabbitmq将尝试将消息发送到一个或多个队列。
使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。
默认情况下,不可更改的消息将被静默地丢弃。
*
*如果设置了'mandatory'或'immediate'标志,
则无法处理的消息将返回到应用程序。
在开始发布之前,请确保您已经调用了recall()-方法,
并设置了所有适当的处理程序来处理这些返回的消息。
*
*可以提供以下flags:
*
*-mandatory 如果设置,服务器将返回未发送到队列的消息
*-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
*@param exchange要发布到的交易所
*@param routingkey路由密钥
*@param envelope要发送的完整信封
*@param message要发送的消息
*@param size消息的大小
*@param flags可选标志
*/
bool publish(const std::string_view &exchange,
const std::string_view &routingkey,
const std::string &message,
int flags = 0);
/**
*告诉rabbitmq服务器已准备好使用消息-也就是 订阅队列消息
*
*调用此方法后,rabbitmq开始向客户端应用程序传递消息。
consumer tag是一个字符串标识符,
如果您以后想通过channel::cancel()调用停止它,
可以使用它来标识使用者。
*如果您没有指定使用者tag,服务器将为您分配一个。
*
*支持以下flags:
*
*-nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
*-noack 如果设置了,则不必对已消费的消息进行确认
*-exclusive 请求独占访问,只有此使用者可以访问队列
*
*@param queue 您要使用的队列
*@param tag 将与此消费操作关联的消费者标记
*@param flags 其他标记
*@param arguments其他参数
*
*此函数返回一个延迟处理程序。
可以使用onsuccess()、onerror()和onfinalize()方法安装回调
可以安装的onsuccess()回调应该具有以下格式:
void mycallback(const std::string_view&tag);
样例:
channel.consume("myqueue").onsuccess(
[](const std::string_view& tag) {
std::cout << "started consuming under tag ";
std::cout << tag << std::endl;
});
*/
deferredconsumer &consume(const std::string_view &queue,
const std::string_view &tag,
int flags,
const table &arguments);
/**
*确认接收到的消息
*
*消费者客户端对收到的消息进行确认应答
*
*当在deferredconsumer::onreceived()方法中接收到消息时,
必须确认该消息,
以便rabbitmq将其从队列中删除(除非使用noack选项消费)
*
*支持以下标志:
*
*-多条确认多条消息:之前传递的所有未确认消息也会得到确认
*
*@param deliverytag 消息的唯一delivery标签
*@param flags 可选标志
*@return bool
*/
bool ack(uint64_t deliverytag, int flags=0);
};
class deferredconsumer
{
/*
注册一个回调函数,该函数在消费者启动时被调用
void onsuccess(const std::string &consumertag)
*/
deferredconsumer &onsuccess(const consumecallback& callback);
/*
注册回调函数,用于接收到一个完整消息的时候被调用
void messagecallback(const amqp::message &message,
uint64_t deliverytag, bool redelivered)
*/
deferredconsumer &onreceived(const messagecallback& callback);
/* alias for onreceived() */
deferredconsumer &onmessage(const messagecallback& callback);
/*
注册要在服务器取消消费者时调用的函数
void cancelcallback(const std::string &tag)
*/
deferredconsumer &oncancelled(const cancelcallback& callback);
};
class message : public envelope
{
const std::string &exchange();
const std::string &routingkey();
};
class envelope : public metadata
{
const char *body(); // 获取消息正文
uint64_t bodysize(); // 获取消息正文大小
};
}2.ev
typedef struct ev_async
{
ev_watcher (ev_async);
ev_atomic_t sent; /* private */
}ev_async;
//break type
enum
{
evbreak_cancel = 0, /* undo unloop */
evbreak_one = 1, /* unloop once */
evbreak_all = 2 /* unloop all loops */
};
// 实例化并获取io事件监控接口句柄
struct ev_loop *ev_default_loop (unsigned int flags ev_cpp (= 0));
# define ev_default ev_default_loop (0)
// 开始运行io事件监控, 这是一个阻塞接口
int ev_run (struct ev_loop *loop);
/* break out of the loop */
// 结束io监控
// 如果在主线程进行ev_run(), 则可以直接调用,
// 如果在其他线程中进行ev_run(), 需要通过异步通知进行
void ev_break (struct ev_loop *loop, int32_t break_type) ;
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
// 初始化异步事件结构, 并设置回调函数
void ev_async_init(ev_async *w, callback cb);
// 启动事件监控循环中的异步任务处理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 发送当前异步事件到异步线程中执行
void ev_async_send(struct ev_loop *loop, ev_async *w);5.使用
1.publish.cc
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
// 1.实例化底层网络通信框架的io事件监控句柄
auto *loop = ev_default;
// 2.实例化libevhandler句柄 -> 将amqp框架与事件监控关联起来
amqp::libevhandler handler(loop);
// 3.实例化连接对象
amqp::address address("amqp://root:snowk8989@127.0.0.1:5672/");
amqp::tcpconnection connection(&handler, address);
// 4.实例化信道对象
amqp::tcpchannel channel(&connection);
// 5.声明交换机
channel.declareexchange("test-exchange", amqp::exchangetype::direct)
.onerror([](const char *message)
{ std::cout << "声明交换机失败: " << message << std::endl; })
.onsuccess([]()
{ std::cout << "test-exchange 交换机创建成功" << std::endl; });
// 6.声明队列
channel.declarequeue("test-queue")
.onerror([](const char *message)
{ std::cout << "声明队列失败: " << message << std::endl; })
.onsuccess([]()
{ std::cout << "test-queue 队列创建成功" << std::endl; });
// 7.针对交换机和队列进行绑定
channel.bindqueue("test-exchange", "test-queue", "test-queue-key")
.onerror([](const char *message)
{ std::cout << "test-exchange - test-queue 绑定失败: " \
<< message << std::endl; })
.onsuccess([]()
{ std::cout << "test-exchange - test-queue 绑定成功"
<< std::endl; });
// 8.向交换机发布消息
for (int i = 0; i < 5; ++i)
{
std::string msg = "hello snowk-" + std::to_string(i);
if(channel.publish("test-exchange", "test-queue-key", msg) == false)
{
std::cout << "publish 失败" << std::endl;
}
}
// 9.启动底层网络通信框架 -> 开启io
ev_run(loop, 0);
return 0;
}2.consume.cc
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
void messagecb(amqp::tcpchannel* channel, const amqp::message& message,
uint64_t deliverytag, bool redelivered)
{
std::string msg;
msg.assign(message.body(), message.bodysize());
// 不能这样使用, amqp::message后面没有存'\0'
// std::cout << message << std::endl
std::cout << msg << std::endl;
channel->ack(deliverytag);
}
int main()
{
// 1.实例化底层网络通信框架的io事件监控句柄
auto *loop = ev_default;
// 2.实例化libevhandler句柄 -> 将amqp框架与事件监控关联起来
amqp::libevhandler handler(loop);
// 3.实例化连接对象
amqp::address address("amqp://root:snowk8989@127.0.0.1:5672/");
amqp::tcpconnection connection(&handler, address);
// 4.实例化信道对象
amqp::tcpchannel channel(&connection);
// 5.声明交换机
channel.declareexchange("test-exchange", amqp::exchangetype::direct)
.onerror([](const char *message)
{ std::cout << "声明交换机失败: " << message << std::endl; })
.onsuccess([]()
{ std::cout << "test-exchange 交换机创建成功" << std::endl; });
// 6.声明队列
channel.declarequeue("test-queue")
.onerror([](const char *message)
{ std::cout << "声明队列失败: " << message << std::endl; })
.onsuccess([]()
{ std::cout << "test-queue 队列创建成功" << std::endl; });
// 7.针对交换机和队列进行绑定
channel.bindqueue("test-exchange", "test-queue", "test-queue-key")
.onerror([](const char *message)
{ std::cout << "test-exchange - test-queue 绑定失败: " \
<< message << std::endl; })
.onsuccess([]()
{ std::cout << "test-exchange - test-queue 绑定成功"; });
// 8.订阅消息对垒 -> 设置消息处理回调函数
auto callback = std::bind(messagecb, &channel, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
channel.consume("test-queue", "consume-tag")
.onreceived(callback)
.onerror([](const char *message)
{
std::cout << "订阅 test-queue 队列消息失败: " << message << std::endl;
exit(0);
});
// 9.启动底层网络通信框架 -> 开启io
ev_run(loop, 0);
return 0;
}3.makefile
all: publish consume publish: publish.cc g++ -o $@ $^ -lamqpcpp -lev -std=c++17 consume: consume.cc g++ -o $@ $^ -lamqpcpp -lev -std=c++17 .phony:clean clean: rm publish consume
到此这篇关于c++ 第三方库 rabbitmq详细讲解的文章就介绍到这了,更多相关c++ 第三方库 rabbitmq内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论