参考文章
【docker安装部署kafka+zookeeper详细教程】_linux arm docker安装kafka-csdn博客
docker搭建kafka+zookeeper
打开我们的docker的镜像源配置
配置
下面的那个insecure是我自己虚拟机的,不用理会
拉取镜像
然后开始拉取我们的zookeeper镜像和我们的kafka镜像
这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本
docker pull zookeeper
kafka镜像
docker pull wurstmeister/kafka
因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络
让不同容器都加入这个网络
docker network create创建我们的网络
然后那个zookeeper_network是我们自定义的网络名称
docker network create --driver bridge zookeeper_network
kafka是依赖于zookeeper的所以我们要先安装zookeeper
我们先用run来创建一个zookeeper容器
docker run -d --name zookeeper1 --network zookeeper_network -p 2181:2181 zookeeper
查看我们创建的网络环境的地址
docker inspect zookeeper_network
那个ipv4就是我们的网络环境的地址,这是我的网络环境的地址
我的是12.21.0.2,这个ip地址是要记住方便后面使用的
创建一个kafka容器
这段代码有点长,根子自己改吧
# 启动kafka
docker run -d --name kafka1 --network zookeeper_network -p 9092:9092 -e kafka_broker_id=0 -e kafka_zookeeper_connect=<zookeeperip地址>:2181 -e kafka_advertised_listeners=plaintext://<宿主机ip地址>:9092 -e kafka_listeners=plaintext://0.0.0.0:9092 wurstmeister/kafka
解释
不知道本机地址可以输入 ip addr来查看本机地址
这样子就搭建完成了
springboot集成kafka
首先就是springboot和kafka的版本兼容了
然后我们引入两个kafka的依赖
自己对着自己的版本来
自己看b站视频,9分钟就搞定了
63-kafka-集成-java场景-springboot_哔哩哔哩_bilibili
然后开始写我们的application.yml配置文件
下面是配置文件的全部+解析
其实和普通mq差不多
也就是配置生产者和消费者和一些过期时间超时时间
重点在于那个missing-topics-fatal
主题不存在的话,我们是否还要成功启动
我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题
所以我启动的时候,报错然后失败了
基本案例
这是常量类
指定了一个topic和group
主题和分组id
groupid是消费者组的唯一标识
这个视频9分钟看懂kafka
小朋友也可以懂的kafka入门教程,还不快来学_哔哩哔哩_bilibili
生产者
我们这个autowired自动注入,会根据我们的配置文件的配置来自动注入
@autowired
private kafkatemplate<string, string> kafkatemplate;
produces里面指定我们前端传的是json格式
我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"
消费者
@kafkalistener(topics = springbootkafkaconfig.topic_test, groupid = springbootkafkaconfig.group_id)
public void topic_test(list<string> messages, @header(kafkaheaders.received_topic) string topic) {
for (string message : messages) {
//因为这个string是json,所以我们可以转回object对象,其实是转成jsonobject对象
final jsonobject entries = jsonutil.parseobj(message);
system.out.println(springbootkafkaconfig.group_id + " 消费了: topic:" + topic + ",message:" + entries.getstr("name"));
//ack.acknowledge();
}
}
我们用list<string>来接收,因为可能一个消费者接收多条消息
指定消费者监听的主题topic
以及指定消费者的唯一标识group_id
这些其实都是自己在常量类里面自己写好的
@header(kafkaheaders.received_topic) string topic
这个是得到我们的主题topic的名字
我用apifox调试之后,成功执行了
kafka的图形化工具
这里介绍一个免费的开源项目kafkaking
releases · bronya0/kafka-king (github.com)
里面还能指定中文
发表评论