当前位置: 代码网 > it编程>编程语言>Java > SpringBoot整合Kafka实现高可用消息队列集群详解

SpringBoot整合Kafka实现高可用消息队列集群详解

2026年01月08日 Java 我要评论
一、kafka简介1.1 什么是kafkaapache kafka是一个分布式流处理平台,具有以下核心特性:高吞吐量:支持每秒百万级消息处理可扩展性:支持水平扩展,可动态添加节点持久化存储:消息可持久

一、kafka简介

1.1 什么是kafka

apache kafka是一个分布式流处理平台,具有以下核心特性:

  • 高吞吐量:支持每秒百万级消息处理
  • 可扩展性:支持水平扩展,可动态添加节点
  • 持久化存储:消息可持久化到磁盘,支持数据保留策略
  • 高可用性:通过副本机制保证数据不丢失
  • 分布式架构:支持多生产者和消费者

1.2 kafka核心概念

  • broker:kafka集群中的单个节点
  • topic:消息的分类主题
  • partition:topic的分区,实现并行处理
  • replica:分区副本,保证高可用
  • producer:消息生产者
  • consumer:消息消费者
  • consumer group:消费者组

二、搭建kafka高可用集群

集群架构规划

建议至少3个节点的kafka集群 + 3个节点的zookeeper集群:

  • zookeeper集群:zk1:2181, zk2:2181, zk3:2181
  • kafka集群:kafka1:9092, kafka2:9092, kafka3:9092

三、springboot整合kafka详细步骤

3.1 创建springboot项目

使用spring initializr创建项目,添加依赖:

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-web</artifactid>
    </dependency>
    
    <dependency>
        <groupid>org.springframework.kafka</groupid>
        <artifactid>spring-kafka</artifactid>
    </dependency>
    
    <dependency>
        <groupid>org.projectlombok</groupid>
        <artifactid>lombok</artifactid>
        <optional>true</optional>
    </dependency>
    
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-validation</artifactid>
    </dependency>
</dependencies>

3.2 配置文件

# application.yml
spring:
  kafka:
    # kafka集群配置(高可用)
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    
    # 生产者配置
    producer:
      retries: 3  # 发送失败重试次数
      acks: all   # 所有副本确认才认为发送成功
      key-serializer: org.apache.kafka.common.serialization.stringserializer
      value-serializer: org.springframework.kafka.support.serializer.jsonserializer
      properties:
        compression.type: snappy  # 压缩类型
        linger.ms: 5  # 等待时间,批量发送提高吞吐量
    
    # 消费者配置
    consumer:
      group-id: ${spring.application.name}-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
      value-deserializer: org.springframework.kafka.support.serializer.jsondeserializer
      properties:
        spring.json.trusted.packages: "com.example.kafka.dto"
        max.poll.records: 500  # 一次拉取最大记录数
        session.timeout.ms: 10000  # 会话超时时间
        heartbeat.interval.ms: 3000  # 心跳间隔
    
    # 监听器配置
    listener:
      concurrency: 3  # 并发消费者数量
      ack-mode: batch  # 批量确认
      missing-topics-fatal: false  # 主题不存在时不报错
      
    # 高可用配置
    properties:
      # 分区副本配置
      replication.factor: 3
      min.insync.replicas: 2
      # 生产者的高可用配置
      enable.idempotence: true  # 幂等性
      max.in.flight.requests.per.connection: 5

# 自定义配置
kafka:
  topics:
    order-topic: order-topic
    payment-topic: payment-topic
    retry-topic: retry-topic
  retry:
    max-attempts: 3
    backoff-interval: 1000

3.3 配置类

// kafkaconfig.java
@configuration
@enablekafka
@slf4j
public class kafkaconfig {
    
    @value("${kafka.topics.order-topic}")
    private string ordertopic;
    
    @value("${kafka.topics.payment-topic}")
    private string paymenttopic;
    
    @value("${kafka.topics.retry-topic}")
    private string retrytopic;
    
    @bean
    public kafkaadmin kafkaadmin() {
        map<string, object> configs = new hashmap<>();
        configs.put(adminclientconfig.bootstrap_servers_config, 
                   "kafka1:9092,kafka2:9092,kafka3:9092");
        return new kafkaadmin(configs);
    }
    
    @bean
    public newtopic ordertopic() {
        // 创建topic:3个分区,3个副本
        return new newtopic(ordertopic, 3, (short) 3);
    }
    
    @bean
    public newtopic paymenttopic() {
        return new newtopic(paymenttopic, 2, (short) 3);
    }
    
    @bean
    public newtopic retrytopic() {
        return new newtopic(retrytopic, 1, (short) 3);
    }
    
    // 死信队列配置
    @bean
    public deadletterpublishingrecoverer dlqrecoverer(kafkatemplate<string, object> template) {
        return new deadletterpublishingrecoverer(template, 
            (record, ex) -> {
                log.error("消息处理失败,发送到死信队列: {}", record.value(), ex);
                return new topicpartition("dlq-topic", record.partition());
            });
    }
    
    @bean
    public defaulterrorhandler errorhandler(deadletterpublishingrecoverer dlqrecoverer) {
        // 重试3次后进入死信队列
        defaulterrorhandler handler = new defaulterrorhandler(dlqrecoverer, 
            new fixedbackoff(1000l, 3));
        handler.addnotretryableexceptions(illegalargumentexception.class);
        return handler;
    }
    
    // 生产者工厂增强配置
    @bean
    public producerfactory<string, object> producerfactory() {
        map<string, object> configprops = new hashmap<>();
        configprops.put(producerconfig.bootstrap_servers_config, 
                       "kafka1:9092,kafka2:9092,kafka3:9092");
        configprops.put(producerconfig.key_serializer_class_config, 
                       stringserializer.class);
        configprops.put(producerconfig.value_serializer_class_config, 
                       jsonserializer.class);
        configprops.put(producerconfig.acks_config, "all");  // 所有副本确认
        configprops.put(producerconfig.retries_config, 3);    // 重试次数
        configprops.put(producerconfig.enable_idempotence_config, true);  // 幂等性
        configprops.put(producerconfig.max_in_flight_requests_per_connection, 5);
        return new defaultkafkaproducerfactory<>(configprops);
    }
}

3.4 消息实体类

// ordermessage.java
@data
@noargsconstructor
@allargsconstructor
@builder
public class ordermessage implements serializable {
    private string orderid;
    private string userid;
    private bigdecimal amount;
    private string productname;
    private integer quantity;
    private localdatetime createtime;
    private messagestatus status;
    
    public enum messagestatus {
        pending, processing, success, failed
    }
}

// paymentmessage.java
@data
@noargsconstructor
@allargsconstructor
@builder
public class paymentmessage {
    private string paymentid;
    private string orderid;
    private bigdecimal amount;
    private paymentmethod paymentmethod;
    private paymentstatus status;
    private localdatetime paymenttime;
    
    public enum paymentmethod {
        alipay, wechat, credit_card
    }
    
    public enum paymentstatus {
        init, processing, success, failed
    }
}

3.5 生产者服务

// kafkaproducerservice.java
@service
@slf4j
public class kafkaproducerservice {
    
    @autowired
    private kafkatemplate<string, object> kafkatemplate;
    
    @value("${kafka.topics.order-topic}")
    private string ordertopic;
    
    @value("${kafka.topics.payment-topic}")
    private string paymenttopic;
    
    /**
     * 发送订单消息(同步)
     */
    public sendresult<string, object> sendordersync(ordermessage ordermessage) {
        try {
            // 设置消息头
            messageheaders headers = new messageheaders(map.of(
                "message-id", uuid.randomuuid().tostring(),
                "message-time", string.valueof(system.currenttimemillis())
            ));
            
            message<ordermessage> message = messagebuilder
                .withpayload(ordermessage)
                .copyheaders(headers)
                .build();
            
            // 同步发送,等待确认
            listenablefuture<sendresult<string, object>> future = 
                kafkatemplate.send(ordertopic, ordermessage.getorderid(), message);
            
            // 等待发送结果
            sendresult<string, object> result = future.get(5, timeunit.seconds);
            log.info("订单消息发送成功: topic={}, partition={}, offset={}", 
                    result.getrecordmetadata().topic(),
                    result.getrecordmetadata().partition(),
                    result.getrecordmetadata().offset());
            return result;
            
        } catch (exception e) {
            log.error("订单消息发送失败: {}", ordermessage, e);
            throw new runtimeexception("消息发送失败", e);
        }
    }
    
    /**
     * 发送订单消息(异步)
     */
    public void sendorderasync(ordermessage ordermessage) {
        kafkatemplate.send(ordertopic, ordermessage.getorderid(), ordermessage)
            .addcallback(new listenablefuturecallback<sendresult<string, object>>() {
                @override
                public void onsuccess(sendresult<string, object> result) {
                    log.info("异步发送成功: topic={}, offset={}", 
                            result.getrecordmetadata().topic(),
                            result.getrecordmetadata().offset());
                }
                
                @override
                public void onfailure(throwable ex) {
                    log.error("异步发送失败: {}", ordermessage, ex);
                    // 可以添加重试逻辑或写入本地文件
                }
            });
    }
    
    /**
     * 批量发送消息
     */
    public void batchsendorders(list<ordermessage> ordermessages) {
        ordermessages.foreach(message -> {
            kafkatemplate.send(ordertopic, message.getorderid(), message);
        });
        kafkatemplate.flush(); // 确保所有消息都发送
    }
    
    /**
     * 发送到指定分区
     */
    public void sendtopartition(ordermessage ordermessage, int partition) {
        kafkatemplate.send(ordertopic, partition, 
                          ordermessage.getorderid(), ordermessage);
    }
    
    /**
     * 事务消息发送
     */
    @transactional(transactionmanager = "kafkatransactionmanager")
    public void sendtransactionalmessage(ordermessage ordermessage) {
        // 数据库操作
        // orderrepository.save(order);
        
        // kafka消息发送(与数据库操作在同一个事务中)
        kafkatemplate.send(ordertopic, ordermessage.getorderid(), ordermessage);
        
        // 其他业务操作
    }
}

3.6 消费者服务

// kafkaconsumerservice.java
@service
@slf4j
public class kafkaconsumerservice {
    
    private static final string order_container_factory = "ordercontainerfactory";
    private static final string payment_container_factory = "paymentcontainerfactory";
    
    /**
     * 订单消息消费者 - 批量消费
     */
    @kafkalistener(
        topics = "${kafka.topics.order-topic}",
        containerfactory = order_container_factory,
        groupid = "order-consumer-group"
    )
    public void consumeordermessages(list<ordermessage> messages) {
        log.info("收到批量订单消息,数量: {}", messages.size());
        
        for (ordermessage message : messages) {
            try {
                processordermessage(message);
            } catch (exception e) {
                log.error("订单处理失败: {}", message.getorderid(), e);
                // 记录失败消息,可以发送到重试队列
            }
        }
    }
    
    /**
     * 单个订单消息消费
     */
    @kafkalistener(
        topics = "${kafka.topics.order-topic}",
        groupid = "order-single-consumer-group"
    )
    public void consumesingleordermessage(
            @payload ordermessage message,
            @header(kafkaheaders.received_topic) string topic,
            @header(kafkaheaders.received_partition) int partition,
            @header(kafkaheaders.offset) long offset) {
        
        log.info("收到单个订单消息: topic={}, partition={}, offset={}, orderid={}", 
                topic, partition, offset, message.getorderid());
        
        try {
            // 业务处理逻辑
            processordermessage(message);
            
            // 处理成功后,可以发送确认消息到下游
            sendpaymentmessage(message);
            
        } catch (exception e) {
            log.error("订单处理失败: {}", message.getorderid(), e);
            throw e; // 抛出异常会触发重试机制
        }
    }
    
    /**
     * 支付消息消费者
     */
    @kafkalistener(
        topics = "${kafka.topics.payment-topic}",
        containerfactory = payment_container_factory,
        groupid = "payment-consumer-group"
    )
    public void consumepaymentmessage(paymentmessage message) {
        log.info("收到支付消息: {}", message.getpaymentid());
        
        // 支付处理逻辑
        try {
            processpayment(message);
        } catch (exception e) {
            log.error("支付处理失败: {}", message.getpaymentid(), e);
        }
    }
    
    private void processordermessage(ordermessage message) {
        // 模拟业务处理
        log.info("处理订单: {},金额: {}", message.getorderid(), message.getamount());
        
        // 业务逻辑,如:
        // 1. 验证订单
        // 2. 扣减库存
        // 3. 记录日志
        // 4. 更新订单状态
        
        // 模拟处理时间
        try {
            thread.sleep(100);
        } catch (interruptedexception e) {
            thread.currentthread().interrupt();
        }
    }
    
    private void sendpaymentmessage(ordermessage ordermessage) {
        paymentmessage paymentmessage = paymentmessage.builder()
            .paymentid(uuid.randomuuid().tostring())
            .orderid(ordermessage.getorderid())
            .amount(ordermessage.getamount())
            .paymentmethod(paymentmessage.paymentmethod.alipay)
            .status(paymentmessage.paymentstatus.init)
            .paymenttime(localdatetime.now())
            .build();
        
        // 这里可以使用kafkatemplate发送支付消息
    }
    
    private void processpayment(paymentmessage message) {
        // 支付处理逻辑
        log.info("处理支付: {},订单: {}", message.getpaymentid(), message.getorderid());
    }
}

3.7 消费者容器工厂配置

// consumerconfig.java
@configuration
public class consumerconfig {
    
    @value("${spring.kafka.bootstrap-servers}")
    private string bootstrapservers;
    
    // 订单消费者容器工厂(批量消费)
    @bean(order_container_factory)
    public concurrentkafkalistenercontainerfactory<string, ordermessage> 
            ordercontainerfactory() {
        
        concurrentkafkalistenercontainerfactory<string, ordermessage> factory =
            new concurrentkafkalistenercontainerfactory<>();
        
        factory.setconsumerfactory(orderconsumerfactory());
        factory.setconcurrency(3); // 并发消费者数量
        factory.getcontainerproperties().setpolltimeout(3000);
        factory.setbatchlistener(true); // 启用批量消费
        factory.getcontainerproperties().setackmode(containerproperties.ackmode.batch);
        
        // 设置批量消费参数
        factory.getcontainerproperties().setidlebetweenpolls(1000);
        
        return factory;
    }
    
    @bean
    public consumerfactory<string, ordermessage> orderconsumerfactory() {
        map<string, object> props = new hashmap<>();
        props.put(consumerconfig.bootstrap_servers_config, bootstrapservers);
        props.put(consumerconfig.group_id_config, "order-consumer-group");
        props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
        props.put(consumerconfig.value_deserializer_class_config, jsondeserializer.class);
        props.put(jsondeserializer.trusted_packages, "com.example.kafka.dto");
        props.put(consumerconfig.max_poll_records_config, 500); // 批量拉取数量
        props.put(consumerconfig.enable_auto_commit_config, false);
        props.put(consumerconfig.auto_offset_reset_config, "earliest");
        
        // 高可用配置
        props.put(consumerconfig.session_timeout_ms_config, 10000);
        props.put(consumerconfig.heartbeat_interval_ms_config, 3000);
        
        return new defaultkafkaconsumerfactory<>(props);
    }
}

3.8 监控和管理端点

// kafkamonitorcontroller.java
@restcontroller
@requestmapping("/api/kafka")
@slf4j
public class kafkamonitorcontroller {
    
    @autowired
    private kafkaadmin kafkaadmin;
    
    @autowired
    private kafkatemplate<string, object> kafkatemplate;
    
    /**
     * 获取topic列表
     */
    @getmapping("/topics")
    public responseentity<list<string>> gettopics() throws exception {
        try (adminclient adminclient = adminclient.create(kafkaadmin.getconfigurationproperties())) {
            listtopicsresult topicsresult = adminclient.listtopics();
            set<string> topicnames = topicsresult.names().get();
            return responseentity.ok(new arraylist<>(topicnames));
        }
    }
    
    /**
     * 获取topic详情
     */
    @getmapping("/topics/{topic}/details")
    public responseentity<map<integer, list<integer>>> gettopicdetails(
            @pathvariable string topic) throws exception {
        
        try (adminclient adminclient = adminclient.create(kafkaadmin.getconfigurationproperties())) {
            describetopicsresult describeresult = adminclient.describetopics(collections.singleton(topic));
            topicdescription topicdescription = describeresult.values().get(topic).get();
            
            map<integer, list<integer>> partitioninfo = new hashmap<>();
            for (topicpartitioninfo partition : topicdescription.partitions()) {
                list<integer> replicas = partition.replicas().stream()
                    .map(node::id)
                    .collect(collectors.tolist());
                partitioninfo.put(partition.partition(), replicas);
            }
            
            return responseentity.ok(partitioninfo);
        }
    }
    
    /**
     * 发送测试消息
     */
    @postmapping("/send-test")
    public responseentity<string> sendtestmessage(@requestparam string topic) {
        ordermessage testmessage = ordermessage.builder()
            .orderid("test-" + system.currenttimemillis())
            .userid("test-user")
            .amount(new bigdecimal("100.00"))
            .productname("测试商品")
            .quantity(1)
            .createtime(localdatetime.now())
            .status(ordermessage.messagestatus.pending)
            .build();
        
        kafkatemplate.send(topic, testmessage.getorderid(), testmessage);
        return responseentity.ok("测试消息发送成功");
    }
    
    /**
     * 获取消费者组信息
     */
    @getmapping("/consumer-groups")
    public responseentity<map<string, object>> getconsumergroups() throws exception {
        try (adminclient adminclient = adminclient.create(kafkaadmin.getconfigurationproperties())) {
            listconsumergroupsresult groupsresult = adminclient.listconsumergroups();
            collection<consumergrouplisting> groups = groupsresult.all().get();
            
            map<string, object> result = new hashmap<>();
            result.put("consumergroups", groups);
            result.put("count", groups.size());
            
            return responseentity.ok(result);
        }
    }
}

3.9 异常处理和重试机制

// kafkaexceptionhandler.java
@component
@slf4j
public class kafkaexceptionhandler {
    
    /**
     * 全局kafka监听器异常处理
     */
    @eventlistener
    public void handleexception(listenercontainerconsumerfailedevent event) {
        log.error("kafka消费者异常: {}", event.getcontainer().getlistenerid(), event.getexception());
        
        // 记录异常信息
        // 发送告警
        // 写入错误日志
    }
    
    /**
     * 自定义重试策略
     */
    @bean
    public retrytemplate kafkaretrytemplate() {
        retrytemplate retrytemplate = new retrytemplate();
        
        // 重试策略:最多重试3次,每次间隔1秒
        simpleretrypolicy retrypolicy = new simpleretrypolicy();
        retrypolicy.setmaxattempts(3);
        
        // 退避策略
        fixedbackoffpolicy backoffpolicy = new fixedbackoffpolicy();
        backoffpolicy.setbackoffperiod(1000l);
        
        retrytemplate.setretrypolicy(retrypolicy);
        retrytemplate.setbackoffpolicy(backoffpolicy);
        
        return retrytemplate;
    }
}

3.10 健康检查

// kafkahealthindicator.java
@component
public class kafkahealthindicator implements healthindicator {
    
    @autowired
    private kafkatemplate<string, object> kafkatemplate;
    
    @override
    public health health() {
        try {
            // 尝试发送一个测试消息来检查kafka连接
            kafkatemplate.send("health-check-topic", "health-check", "ping")
                .get(5, timeunit.seconds);
            
            return health.up()
                .withdetail("status", "kafka集群连接正常")
                .withdetail("timestamp", localdatetime.now())
                .build();
            
        } catch (exception e) {
            return health.down()
                .withdetail("status", "kafka集群连接异常")
                .withdetail("error", e.getmessage())
                .withdetail("timestamp", localdatetime.now())
                .build();
        }
    }
}

四、高可用性保障措施

4.1 集群配置建议

# kafka-server.properties 关键配置
broker.id=1
listeners=plaintext://:9092
advertised.listeners=plaintext://kafka1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志配置
log.dirs=/data/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1

# 副本和isr配置
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
default.replication.factor=3
min.insync.replicas=2

# 日志保留
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# zookeeper配置
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000

4.2 生产环境部署建议

硬件配置

  • 至少3个kafka节点 + 3个zookeeper节点
  • ssd磁盘提高io性能
  • 充足的内存和cpu资源

网络配置

  • 使用专用网络
  • 配置合理的防火墙规则

监控告警

  • 使用kafka manager或confluent control center
  • 监控指标:吞吐量、延迟、副本同步状态

五、测试示例

// kafkaintegrationtest.java
@springboottest
@slf4j
class kafkaintegrationtest {
    
    @autowired
    private kafkaproducerservice producerservice;
    
    @test
    void testsendandreceivemessage() throws interruptedexception {
        // 创建测试消息
        ordermessage ordermessage = ordermessage.builder()
            .orderid("test-" + uuid.randomuuid())
            .userid("user-001")
            .amount(new bigdecimal("199.99"))
            .productname("测试商品")
            .quantity(2)
            .createtime(localdatetime.now())
            .status(ordermessage.messagestatus.pending)
            .build();
        
        // 发送消息
        sendresult<string, object> result = producerservice.sendordersync(ordermessage);
        
        assertnotnull(result);
        assertnotnull(result.getrecordmetadata());
        
        log.info("消息发送成功,分区: {}, offset: {}", 
                result.getrecordmetadata().partition(),
                result.getrecordmetadata().offset());
        
        // 等待消费者处理
        thread.sleep(2000);
    }
    
    @test
    void testbatchsend() {
        list<ordermessage> messages = new arraylist<>();
        for (int i = 0; i < 100; i++) {
            ordermessage message = ordermessage.builder()
                .orderid("batch-" + i)
                .userid("user-" + i)
                .amount(new bigdecimal(i * 10))
                .productname("商品" + i)
                .quantity(1)
                .createtime(localdatetime.now())
                .status(ordermessage.messagestatus.pending)
                .build();
            messages.add(message);
        }
        
        producerservice.batchsendorders(messages);
    }
}

六、总结

6.1 实现的高可用特性

  • 数据冗余:通过副本机制(replication factor=3)保证数据安全
  • 故障转移:leader选举机制确保节点故障时自动切换
  • 负载均衡:分区机制实现水平扩展和负载均衡
  • 容错处理:死信队列和重试机制保障消息不丢失
  • 监控告警:完善的健康检查和监控体系

6.2 最佳实践建议

  • 合理规划分区:根据业务吞吐量和消费者数量设置分区数
  • 监控副本同步:确保isr(in-sync replicas)数量足够
  • 配置重试机制:针对网络波动和临时故障进行重试
  • 实施消息幂等:避免重复消费问题
  • 定期清理数据:设置合理的消息保留策略

6.3 性能优化建议

  • 批量操作:使用批量发送和批量消费提高吞吐量
  • 压缩传输:启用消息压缩减少网络带宽消耗
  • 合理批大小:根据业务场景调整批量大小
  • 异步确认:非关键业务使用异步发送提高响应速度

通过以上方案,springboot整合kafka实现了高可用的消息队列集群,具备生产级的可靠性、可扩展性和容错能力,能够满足企业级应用的需求。

以上就是springboot整合kafka实现高可用消息队列集群详解的详细内容,更多关于springboot kafka实现消息队列集群的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com