当前位置: 代码网 > it编程>编程语言>Java > Spring Boot集成Apache Kafka的实战指南

Spring Boot集成Apache Kafka的实战指南

2025年06月23日 Java 我要评论
apache kafka 是一个分布式流处理平台,广泛用于构建实时数据管道、日志聚合系统和事件溯源架构。spring boot 提供了对 kafka 的良好集成支持,使得开发者可以非常便捷地在项目中使

apache kafka 是一个分布式流处理平台,广泛用于构建实时数据管道、日志聚合系统和事件溯源架构。spring boot 提供了对 kafka 的良好集成支持,使得开发者可以非常便捷地在项目中使用 kafka。

本文将手把手教你如何在 spring boot 项目中集成 kafka,包括生产者(producer)和消费者(consumer)的实现,并提供完整的代码示例。

开发环境准备

java 17+

maven 或 gradle

spring boot 3.x

apache kafka 3.0+(本地或远程)

ide(如 intellij idea、vs code)

创建 spring boot 项目

你可以通过 spring initializr 创建一个新的 spring boot 项目,选择以下依赖:

  • spring web
  • spring for apache kafka

或者手动添加 pom.xml 中的依赖:

<dependency>
    <groupid>org.springframework.kafka</groupid>
    <artifactid>spring-kafka</artifactid>
</dependency>

spring boot 会自动管理版本兼容性,无需手动指定版本号。

配置 kafka 连接信息

在 application.yml 或 application.properties 文件中配置 kafka 相关参数:

application.yml 示例:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.stringserializer
      value-serializer: org.apache.kafka.common.serialization.stringserializer

编写 kafka 生产者(producer)

创建一个服务类用于发送消息到 kafka 主题。

kafkaproducer.java

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.service;

@service
public class kafkaproducer {

    private final kafkatemplate<string, string> kafkatemplate;

    public kafkaproducer(kafkatemplate<string, string> kafkatemplate) {
        this.kafkatemplate = kafkatemplate;
    }

    public void sendmessage(string topic, string message) {
        kafkatemplate.send(topic, message);
        system.out.println("sent message: " + message);
    }
}

编写 kafka 消费者(consumer)

使用 @kafkalistener 注解监听特定主题的消息。

kafkaconsumer.java

import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.service;

@service
public class kafkaconsumer {

    @kafkalistener(topics = "test-topic", groupid = "my-group")
    public void listen(consumerrecord<string, string> record) {
        system.out.printf("received message: topic - %s, partition - %d, offset - %d, key - %s, value - %s%n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

添加 rest 接口用于测试发送消息

为了方便测试,我们可以创建一个简单的 rest 控制器来触发消息发送。

kafkacontroller.java

import org.springframework.web.bind.annotation.*;
import org.springframework.beans.factory.annotation.autowired;

@restcontroller
@requestmapping("/kafka")
public class kafkacontroller {

    @autowired
    private kafkaproducer kafkaproducer;

    @postmapping("/send")
    public string sendmessage(@requestparam string msg) {
        kafkaproducer.sendmessage("test-topic", msg);
        return "message sent: " + msg;
    }
}

启动 kafka 环境(可选)

如果你还没有运行 kafka,可以按照以下步骤快速启动:

启动 zookeeper(kafka 依赖)

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 kafka 服务

bin/kafka-server-start.sh config/server.properties

创建测试 topic

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

测试接口

启动 spring boot 应用后,访问如下接口发送消息:

post http://localhost:8080/kafka/send?msg=hellokafka

观察控制台输出,确认是否收到类似以下内容:

received message: topic - test-topic, partition - 0, offset - 5, key - null, value - hellokafka

扩展功能建议

使用 json 格式传输对象(自定义序列化/反序列化)

多消费者组配置与负载均衡

异常处理与重试机制(@dlthandler, seektocurrenterrorhandler)

kafka streams 实现实时流处理逻辑

配置 ssl、sasl 安全认证

结合 spring cloud stream 构建云原生事件驱动架构

到此这篇关于spring boot集成apache kafka的实战指南的文章就介绍到这了,更多相关springboot集成apache kafka内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com