一、背景
在实际工程中,难免会遇到不通系统之间通信,如何进行系统之间通信呢?(作为一个“全栈工程师”,必须要解决它!)。
系统之间通信方式很多如:系统之间调用(http/rpc等),异步间接调用如发送消息、公共存储等。目前,本人从事的项目中遇到web业务工程(java)依赖与算法工程(c++) 处理的视频/图片分类与标记结果。两个系统之前数据通信采用了kafka消息方式。
算法工程为c/c++工程,本文将介绍如何在c/c++中如何发送与接收kakfa消息(包含:kafka的sasl认证方式),并提供了详细的源码和讲解。(至于java中如何发送与接收kakfa消息如有需要,可留言或私聊!)
二、环境依赖安装
# 下载librdkafka
git clone https://github.com/edenhill/librdkafka.git
# 编译
cd librdkafka
./configure --prefix=/usr/local
# 安装
sudo make install
# 验证:查看/usr/local/lib目录下是否有librdkafka文件
ls /usr/local/lib | grep kafka
三、编写kakfa生产者消费者
3.1 生产者
#include <rdkafka.h> // 包含c api头文件
#include <iostream>
#include <cstring>
#include <cerrno>
int main() {
const char *brokers = "xx.xx.xx.xx:7091"; // kafka broker地址
const char *topic_name = "kafka_msg_topic_test";
const char *payload = "hello, kafka from librdkafka!";
size_t len = strlen(payload);
// 创建配置对象
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!conf) {
std::cerr << "failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 设置broker地址
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, null, 0) != rd_kafka_conf_ok) {
std::cerr << "failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 创建生产者实例
rd_kafka_t *rk = rd_kafka_new(rd_kafka_producer, conf, null, 0);
if (!rk) {
std::cerr << "failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 创建topic句柄(可选,但推荐)
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, null);
if (!rkt) {
std::cerr << "failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_destroy(rk);
// rd_kafka_conf_destroy(conf);
return 1;
}
// 发送消息
int32_t partition = rd_kafka_partition_ua; // 自动选择分区
int err = rd_kafka_produce(rkt, partition, rd_kafka_msg_f_copy, const_cast<char *>(payload), len, null, 0, null);
if (err != rd_kafka_resp_err_no_error) {
std::cerr << "failed to produce to topic " << topic_name << ": " << err << std::endl;
} else {
std::cout << "produced " << len << " bytes to topic " << topic_name << std::endl;
}
// 等待所有消息发送完成(可选,但推荐)
// 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认
int msgs_sent = 0;
while (rd_kafka_outq_len(rk) > 0) {
rd_kafka_poll(rk, 100); // 轮询kafka队列,直到所有消息都发送出去
msgs_sent += rd_kafka_outq_len(rk);
}
// 销毁topic句柄
rd_kafka_topic_destroy(rkt);
// 销毁生产者实例
rd_kafka_destroy(rk);
// 销毁配置对象
// rd_kafka_conf_destroy(conf);
return 0;
}
3.2 消费者
#include <rdkafka.h>
#include <iostream>
#include <cerrno>
#include <cstring>
#include <cstdlib>
void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
// 错误处理回调
std::cerr << "kafka error: " << err << ": " << reason << std::endl;
}
int main() {
std::cerr << "start " << std::endl;
const char *brokers = "xx.xx.xx.xx:7091"; // kafka broker地址
const char *group_id = "kafka_msg_topic_test"; // 消费者组id
const char *topic_name = "kafka_msg_topic_test"; // kafka topic名称
// 创建配置对象
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!conf) {
std::cerr << "failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 设置broker地址
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, null, 0) != rd_kafka_conf_ok) {
std::cerr << "failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 设置消费者组id
if (rd_kafka_conf_set(conf, "group.id", group_id, null, 0) != rd_kafka_conf_ok) {
std::cerr << "failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 设置错误处理回调(可选)
rd_kafka_conf_set_error_cb(conf, error_cb);
// 创建消费者实例
rd_kafka_t *rk = rd_kafka_new(rd_kafka_consumer, conf, null, 0);
if (!rk) {
std::cerr << "failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 创建一个topic分区列表
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
if (!topics) {
std::cerr << "failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_destroy(rk);
return 1;
}
// 添加topic到分区列表
if (!rd_kafka_topic_partition_list_add(topics, topic_name, rd_kafka_partition_ua)) {
std::cerr << "failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(rk);
return 1;
}
// 订阅topic
rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics);
if (err != rd_kafka_resp_err_no_error) {
std::cerr << "failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl;
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(rk);
return 1;
}
// 销毁分区列表(订阅后不再需要)
rd_kafka_topic_partition_list_destroy(topics);
// 轮询消息
while (true) {
rd_kafka_message_t *rkmessage;
rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以获取消息
if (rkmessage == null) {
// 没有消息或者超时
continue;
}
if (rkmessage->err) {
// 处理错误
if (rkmessage->err == rd_kafka_resp_err__partition_eof) {
// 消息流的末尾
std::cout << "end of partition event" << std::endl;
} else {
// 打印错误并退出
std::cerr << "kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl;
break;
}
} else {
// 处理消息
std::cout << "received message at offset " << rkmessage->offset
<< " from partition " << rkmessage->partition
<< " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len
<< " value :" <<(char *)rkmessage->payload
<< std::endl;
// 如果需要,可以在这里处理消息内容
// 例如,使用rkmessage->payload()获取消息内容
// 释放消息
rd_kafka_message_destroy(rkmessage);
}
}
// 清理
rd_kafka_destroy(rk);
return 0;
}
3.3 编译运行
3.3.1 编译生产者消费者
g++ -o send_kafka sendkakfamessage.cpp -i/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka receivekafkamessage.cpp -i/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
3.3.2 运行验证
生产者:发送消息
消费者:接收消息
3.4 sasl认证kakfa
下面是,支持sasl认证的kakka生产者完整代码
#include <rdkafka.h>
#include <iostream>
#include <cstring>
#include <cerrno>
int main(int argc, char *argv[]) {
const char *brokers = "xx.xx.xx.xx:8092"; // kafka broker地址
const char *username = "xxx";
const char *password = "xxx";
const char *topic_name = "kafka_msg_test_sasl";
const char *payload = "hello, kafka from librdkafka! sasl";
size_t len = strlen(payload);
// 初始化配置
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!conf)
{
std::cerr << "failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
char errstr[512]; // 声明一个足够大的字符数组来存储错误信息
// 设置sasl相关的配置
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != rd_kafka_conf_ok)
{
std::cerr << "failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "sasl.mechanisms", "plain", errstr, sizeof(errstr)) != rd_kafka_conf_ok)
{
std::cerr << "failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != rd_kafka_conf_ok)
{
std::cerr << "failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != rd_kafka_conf_ok)
{
std::cerr << "failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != rd_kafka_conf_ok)
{
std::cerr << "failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 检查配置是否设置成功
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != rd_kafka_conf_ok) {
std::cerr << "failed to set configuration: " << errstr << std::endl;
return 1;
}
// 创建producer实例
rd_kafka_t *rk = rd_kafka_new(rd_kafka_producer, conf, errstr, sizeof(errstr));
if (!rk) {
std::cerr << "failed to create new producer: " << errstr << std::endl;
return 1;
}
// 创建topic句柄(可选,但推荐)
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, null);
if (!rkt)
{
std::cerr << "failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_destroy(rk);
// rd_kafka_conf_destroy(conf);
return 1;
}
// 发送消息
int32_t partition = rd_kafka_partition_ua; // 自动选择分区
int err = rd_kafka_produce(rkt, partition, rd_kafka_msg_f_copy, const_cast<char *>(payload), len, null, 0, null);
if (err != rd_kafka_resp_err_no_error)
{
std::cerr << "failed to produce to topic " << topic_name << ": " << err << std::endl;
}
else
{
std::cout << "produced " << len << " bytes to topic " << topic_name << std::endl;
}
// 等待所有消息发送完成(可选,但推荐)
// 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认
int msgs_sent = 0;
while (rd_kafka_outq_len(rk) > 0)
{
rd_kafka_poll(rk, 100); // 轮询kafka队列,直到所有消息都发送出去
msgs_sent += rd_kafka_outq_len(rk);
}
// 销毁topic句柄
rd_kafka_topic_destroy(rkt);
// 清理资源
rd_kafka_destroy(rk);
return 0;
}
在kafka map 管理界面中查看发送效果如下:
3.5 结束语
本文详细描述了kakfa依赖安装、消息生产者、消费者、kafka sasl认证等相关完整代码,完整工程见github地址。如果有疑问,可留言/私聊!
发表评论