apache kafka 是一个分布式流处理平台,广泛用于构建实时数据管道、日志聚合系统和事件溯源架构。spring boot 提供了对 kafka 的良好集成支持,使得开发者可以非常便捷地在项目中使用 kafka。
本文将手把手教你如何在 spring boot 项目中集成 kafka,包括生产者(producer)和消费者(consumer)的实现,并提供完整的代码示例。
开发环境准备
java 17+
maven 或 gradle
spring boot 3.x
apache kafka 3.0+(本地或远程)
ide(如 intellij idea、vs code)
创建 spring boot 项目
你可以通过 spring initializr 创建一个新的 spring boot 项目,选择以下依赖:
- spring web
- spring for apache kafka
或者手动添加 pom.xml 中的依赖:
<dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> </dependency>
spring boot 会自动管理版本兼容性,无需手动指定版本号。
配置 kafka 连接信息
在 application.yml 或 application.properties 文件中配置 kafka 相关参数:
application.yml 示例:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: key-serializer: org.apache.kafka.common.serialization.stringserializer value-serializer: org.apache.kafka.common.serialization.stringserializer
编写 kafka 生产者(producer)
创建一个服务类用于发送消息到 kafka 主题。
kafkaproducer.java
import org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; import org.springframework.stereotype.service; @service public class kafkaproducer { private final kafkatemplate<string, string> kafkatemplate; public kafkaproducer(kafkatemplate<string, string> kafkatemplate) { this.kafkatemplate = kafkatemplate; } public void sendmessage(string topic, string message) { kafkatemplate.send(topic, message); system.out.println("sent message: " + message); } }
编写 kafka 消费者(consumer)
使用 @kafkalistener 注解监听特定主题的消息。
kafkaconsumer.java
import org.apache.kafka.clients.consumer.consumerrecord; import org.springframework.kafka.annotation.kafkalistener; import org.springframework.stereotype.service; @service public class kafkaconsumer { @kafkalistener(topics = "test-topic", groupid = "my-group") public void listen(consumerrecord<string, string> record) { system.out.printf("received message: topic - %s, partition - %d, offset - %d, key - %s, value - %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } }
添加 rest 接口用于测试发送消息
为了方便测试,我们可以创建一个简单的 rest 控制器来触发消息发送。
kafkacontroller.java
import org.springframework.web.bind.annotation.*; import org.springframework.beans.factory.annotation.autowired; @restcontroller @requestmapping("/kafka") public class kafkacontroller { @autowired private kafkaproducer kafkaproducer; @postmapping("/send") public string sendmessage(@requestparam string msg) { kafkaproducer.sendmessage("test-topic", msg); return "message sent: " + msg; } }
启动 kafka 环境(可选)
如果你还没有运行 kafka,可以按照以下步骤快速启动:
启动 zookeeper(kafka 依赖)
bin/zookeeper-server-start.sh config/zookeeper.properties
启动 kafka 服务
bin/kafka-server-start.sh config/server.properties
创建测试 topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
测试接口
启动 spring boot 应用后,访问如下接口发送消息:
post http://localhost:8080/kafka/send?msg=hellokafka
观察控制台输出,确认是否收到类似以下内容:
received message: topic - test-topic, partition - 0, offset - 5, key - null, value - hellokafka
扩展功能建议
使用 json 格式传输对象(自定义序列化/反序列化)
多消费者组配置与负载均衡
异常处理与重试机制(@dlthandler, seektocurrenterrorhandler)
kafka streams 实现实时流处理逻辑
配置 ssl、sasl 安全认证
结合 spring cloud stream 构建云原生事件驱动架构
到此这篇关于spring boot集成apache kafka的实战指南的文章就介绍到这了,更多相关springboot集成apache kafka内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论