当前位置: 代码网 > it编程>编程语言>Java > Spring-Kafka 3.0 消费者消费失败处理方案

Spring-Kafka 3.0 消费者消费失败处理方案

2024年07月31日 Java 我要评论
我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存Kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consum

一、背景

我们作为kafka在使用kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存

二、设置消费失败重试次数

1 默认重试次数在哪里看

kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.seekutils

2 如何修改重试次数

据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)

经过我不懈努力和尝试,只能通过java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码

    public commonerrorhandler commonerrorhandler() {
        backoff backoff = new fixedbackoff(5000l, 3l);
        return  new defaulterrorhandler(backoff);
    }

然后把这个handler 添加到concurrentkafkalistenercontainerfactory中就行了

三、设置消费失败处理方式

1 保存到数据库重试

我们需要在创建defaulterrorhandler类时加入一个consumerawarerecordrecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitsync方法,首先这个consumer.commitsync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset

    public commonerrorhandler commonerrorhandler() {
        // 创建 fixedbackoff 对象
        backoff backoff = new fixedbackoff(5000l, 3l);
        defaulterrorhandler defaulterrorhandler = new defaulterrorhandler((consumerawarerecordrecoverer) (record, consumer, exception) -> {
            log.info("save to db " + record.value().tostring());
        }, backoff);
        return defaulterrorhandler;
    }

如果你硬要提交也可以试试下面这种,指定提交当前的offset

    public commonerrorhandler commonerrorhandler() {
        // 创建 fixedbackoff 对象
        backoff backoff = new fixedbackoff(5000l, 3l);
        defaulterrorhandler defaulterrorhandler = new defaulterrorhandler((consumerawarerecordrecoverer) (record, consumer, exception) -> {
            log.info("save to db " + record.value().tostring());
            map<topicpartition, offsetandmetadata> offsets = new hashmap<>();
            offsets.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(record.offset()));
            consumer.commitsync(offsets);
        }, backoff);
        return defaulterrorhandler;
    }

2 发送到kafka死信队列

仍然在创建defaulterrorhandler类时加入一个deadletterpublishingrecoverer 类就行了,默认会把消息发到kafkatemplate 配置的topic名字为your_topic+.dlt

@autowired
private kafkatemplate<string, string> kafkatemplate;
   

 public commonerrorhandler commonerrorhandler() {
        // 创建 fixedbackoff 对象
        backoff backoff = new fixedbackoff(5000l, 3l);

        defaulterrorhandler defaulterrorhandler = new defaulterrorhandler(new deadletterpublishingrecoverer(kafkatemplate), backoff);

        return defaulterrorhandler;
    }
consumerrecordrecoverer 接口总共就这2种实现方式

四、整体消费者代码粘贴

1 application.yml

kafka-consumer:
  bootstrapservers: 192.168.31.114:9092
  groupid: goods-center
  #后台的心跳线程必须在30秒之内提交心跳,否则会rebalance
  sessiontimeout: 30000
  autooffsetreset: latest
  #取消自动提交,即便如此 spring会帮助我们自动提交
  enableautocommit: false
  #自动提交间隔
  autocommitinterval: 1000
  #拉取的最小字节
  fetchminsize: 1
  #拉去最小字节的最大等待时间
  fetchmaxwait: 500
  maxpollrecords: 50
  #300秒的提交间隔,如果程序大于300秒提交,会报错
  maxpollinterval: 300000
  #心跳间隔
  heartbeatinterval: 10000
  keydeserializer: org.apache.kafka.common.serialization.longdeserializer
  valuedeserializer: org.springframework.kafka.support.serializer.jsondeserializer

2 kafkalistenerproperties

package com.ychen.goodscenter.fafka;

import lombok.getter;
import lombok.setter;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.configuration;

@configuration
//指定配置文件的前缀
@configurationproperties(prefix = "kafka-consumer")
@getter
@setter
public class kafkalistenerproperties {
 
    private string groupid;
 
    private string sessiontimeout;
 
    private string bootstrapservers;
 
    private string autooffsetreset;
 
    private boolean enableautocommit;
 
    private string autocommitinterval;
 
    private string fetchminsize;
 
    private string fetchmaxwait;
 
    private string maxpollrecords;
 
    private string maxpollinterval;
 
    private string heartbeatinterval;
 
    private string keydeserializer;
 
    private string valuedeserializer;
 
}

3 kafkaconsumerconfig

package com.ychen.goodscenter.fafka;

import com.alibaba.fastjson2.jsonobject;
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.context.properties.enableconfigurationproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.config.kafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.backoff;
import org.springframework.util.backoff.fixedbackoff;

import java.util.hashmap;
import java.util.map;

@configuration
@enableconfigurationproperties(kafkalistenerproperties.class)
@slf4j
public class kafkaconsumerconfig {
    @autowired
    private kafkalistenerproperties kafkalistenerproperties;

    @autowired
    private kafkatemplate<string, string> kafkatemplate;

    @bean
    public kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() {
        concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>();
        factory.setconsumerfactory(consumerfactory());
        // 并发数 多个微服务实例会均分
        factory.setconcurrency(2);
//        factory.setbatchlistener(true);
        factory.setcommonerrorhandler(commonerrorhandler());

        containerproperties containerproperties = factory.getcontainerproperties();
        // 是否设置手动提交
        containerproperties.setackmode(containerproperties.ackmode.manual_immediate);

        return factory;
    }


    private consumerfactory<string, string> consumerfactory() {
        map<string, object> consumerconfigs = consumerconfigs();
        log.info("消费者的配置信息:{}", jsonobject.tojsonstring(consumerconfigs));
        return new defaultkafkaconsumerfactory<>(consumerconfigs);
    }


    public commonerrorhandler commonerrorhandler() {
        // 创建 fixedbackoff 对象
        backoff backoff = new fixedbackoff(5000l, 3l);

        defaulterrorhandler defaulterrorhandler = new defaulterrorhandler(new deadletterpublishingrecoverer(kafkatemplate), backoff);
//        defaulterrorhandler defaulterrorhandler = new defaulterrorhandler((consumerawarerecordrecoverer) (record, consumer, exception) -> {
//            log.info("save to db " + record.value().tostring());
//            map<topicpartition, offsetandmetadata> offsets = new hashmap<>();
//            offsets.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(record.offset()));
//            consumer.commitsync(offsets);
//        }, backoff);
        return defaulterrorhandler;
    }

    public map<string, object> consumerconfigs() {
        map<string, object> propsmap = new hashmap<>();
        // 服务器地址
        propsmap.put(consumerconfig.bootstrap_servers_config, kafkalistenerproperties.getbootstrapservers());
        // 是否自动提交
        propsmap.put(consumerconfig.enable_auto_commit_config, kafkalistenerproperties.isenableautocommit());
        // 自动提交间隔
        propsmap.put(consumerconfig.auto_commit_interval_ms_config, kafkalistenerproperties.getautocommitinterval());

        //会话时间
        propsmap.put(consumerconfig.session_timeout_ms_config, kafkalistenerproperties.getsessiontimeout());
        //key序列化
        propsmap.put(consumerconfig.key_deserializer_class_config, kafkalistenerproperties.getkeydeserializer());
        //value序列化
        propsmap.put(consumerconfig.value_deserializer_class_config, kafkalistenerproperties.getvaluedeserializer());
        // 心跳时间
        propsmap.put(consumerconfig.heartbeat_interval_ms_config, kafkalistenerproperties.getheartbeatinterval());

        // 分组id
        propsmap.put(consumerconfig.group_id_config, kafkalistenerproperties.getgroupid());
        //消费策略
        propsmap.put(consumerconfig.auto_offset_reset_config, kafkalistenerproperties.getautooffsetreset());
        // poll记录数
        propsmap.put(consumerconfig.max_poll_records_config, kafkalistenerproperties.getmaxpollrecords());
        //poll时间
        propsmap.put(consumerconfig.max_poll_interval_ms_config, kafkalistenerproperties.getmaxpollinterval());

        propsmap.put("spring.json.trusted.packages", "com.ychen.**");

        return propsmap;
    }


}

4 messagelistener

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.orderservice;
import com.ychen.goodscenter.vo.req.submitorderreq;
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.dao.duplicatekeyexception;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.kafka.support.acknowledgment;
import org.springframework.stereotype.component;

@component
@slf4j
public class messagelistener {
    @autowired
    private orderservice orderservice;

    @kafkalistener(topics = "order-message-topic", containerfactory = "kafkalistenercontainerfactory")
    public void processmessage(consumerrecord<long, submitorderreq> record, acknowledgment acknowledgment) {
        log.info("order-message-topic message listener, thread id: " + thread.currentthread().getid());

        try {
            log.info("order-message-topic message received, orderid: {}", record.value().getorderid());

            orderservice.submitorder(record.value());
            // 同步提交
            acknowledgment.acknowledge();
            log.info("order-message-topic message acked: orderid: {}", record.value().getorderid());
        } catch (duplicatekeyexception dupe) {
            // 处理异常情况
            log.error("order-message-topic message error duplicatekeyexception", dupe);
            // 重复数据,忽略掉,同步提交
            acknowledgment.acknowledge();
        }
    }
}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com