当前位置: 代码网 > it编程>编程语言>Java > Spring Boot 集成 Kafka的详细步骤

Spring Boot 集成 Kafka的详细步骤

2024年07月26日 Java 我要评论
spring boot 与 kafka 集成是实现高效消息传递和数据流处理的常见方式。spring boot 提供了简化 kafka 配置和使用的功能,使得集成过程变得更加直观和高效。以下是 spri

spring boot 与 kafka 集成是实现高效消息传递和数据流处理的常见方式。spring boot 提供了简化 kafka 配置和使用的功能,使得集成过程变得更加直观和高效。以下是 spring boot 集成 kafka 的详细步骤,包括配置、生产者和消费者的实现以及一些高级特性。

1. 添加依赖

首先,你需要在 spring boot 项目的 pom.xml 文件中添加 kafka 相关的依赖。使用 spring boot 的起步依赖可以简化配置。

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-kafka</artifactid>
</dependency>

2. 配置 kafka

2.1. 配置文件

application.propertiesapplication.yml 文件中配置 kafka 相关属性。

application.properties:

# kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092
# kafka 消费者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.stringdeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.stringdeserializer
# kafka 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.stringserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.stringserializer

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
      value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.stringserializer
      value-serializer: org.apache.kafka.common.serialization.stringserializer

2.2. kafka 配置类

在 spring boot 中,你可以使用 @configuration 注解创建一个配置类,来定义 kafka 的生产者和消费者配置。

import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.listener.concurrentmessagelistenercontainer;
import org.springframework.kafka.listener.config.containerproperties;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.producerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import java.util.hashmap;
import java.util.map;
@configuration
@enablekafka
public class kafkaconfig {
    @bean
    public producerfactory<string, string> producerfactory() {
        map<string, object> configprops = new hashmap<>();
        configprops.put(producerconfig.bootstrap_servers_config, "localhost:9092");
        configprops.put(producerconfig.key_serializer_class_config, stringserializer.class);
        configprops.put(producerconfig.value_serializer_class_config, stringserializer.class);
        return new defaultkafkaproducerfactory<>(configprops);
    }
    @bean
    public kafkatemplate<string, string> kafkatemplate() {
        return new kafkatemplate<>(producerfactory());
    }
    @bean
    public consumerfactory<string, string> consumerfactory() {
        map<string, object> configprops = new hashmap<>();
        configprops.put(consumerconfig.bootstrap_servers_config, "localhost:9092");
        configprops.put(consumerconfig.group_id_config, "my-group");
        configprops.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
        configprops.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
        return new defaultkafkaconsumerfactory<>(configprops);
    }
    @bean
    public concurrentkafkalistenercontainerfactory<string, string> kafkalistenercontainerfactory() {
        concurrentkafkalistenercontainerfactory<string, string> factory =
                new concurrentkafkalistenercontainerfactory<>();
        factory.setconsumerfactory(consumerfactory());
        return factory;
    }
}

3. 实现 kafka 生产者

3.1. 生产者服务

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.service;
@service
public class kafkaproducerservice {
    @autowired
    private kafkatemplate<string, string> kafkatemplate;
    private static final string topic = "my_topic";
    public void sendmessage(string message) {
        kafkatemplate.send(topic, message);
    }
}

3.2. 控制器示例

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.postmapping;
import org.springframework.web.bind.annotation.requestbody;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
public class kafkacontroller {
    @autowired
    private kafkaproducerservice kafkaproducerservice;
    @postmapping("/send")
    public void sendmessage(@requestbody string message) {
        kafkaproducerservice.sendmessage(message);
    }
}

4. 实现 kafka 消费者

4.1. 消费者服务

import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.service;
@service
public class kafkaconsumerservice {
    @kafkalistener(topics = "my_topic", groupid = "my-group")
    public void listen(string message) {
        system.out.println("received message: " + message);
    }
}

5. 高级特性

5.1. 消息事务

kafka 支持消息事务,确保消息的原子性。

生产者配置

spring.kafka.producer.enable-idempotence=true
spring.kafka.producer.transaction-id-prefix=my-transactional-id

使用事务

import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;
import org.springframework.kafka.core.transactiontemplate;
import org.springframework.stereotype.service;
import org.springframework.transaction.annotation.transactional;
@service
public class kafkatransactionalservice {
    private final kafkatemplate<string, string> kafkatemplate;
    private final transactiontemplate transactiontemplate;
    public kafkatransactionalservice(kafkatemplate<string, string> kafkatemplate, transactiontemplate transactiontemplate) {
        this.kafkatemplate = kafkatemplate;
        this.transactiontemplate = transactiontemplate;
    }
    @transactional
    public void sendmessageintransaction(string message) {
        kafkatemplate.executeintransaction(t -> {
            kafkatemplate.send("my_topic", message);
            return true;
        });
    }
}

5.2. 异步发送与回调

异步发送

public void sendmessageasync(string message) {
    kafkatemplate.send("my_topic", message).addcallback(
        result -> system.out.println("sent message: " + message),
        ex -> system.err.println("failed to send message: " + ex.getmessage())
    );
}

总结

spring boot 与 kafka 的集成使得消息队列的使用变得更加简单和高效。通过上述步骤,你可以轻松地配置 kafka、实现生产者和消费者,并利用 spring boot 提供的强大功能来处理消息流。了解 kafka 的高级特性(如事务和异步处理)能够帮助你更好地满足业务需求,确保系统的高可用性和数据一致性。

到此这篇关于spring boot 集成 kafka的详细步骤的文章就介绍到这了,更多相关spring boot 集成 kafka内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com