一、背景
我们作为kafka在使用kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存
二、设置消费失败重试次数
1 默认重试次数在哪里看
kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.seekutils
2 如何修改重试次数
据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)
经过我不懈努力和尝试,只能通过java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码
public commonerrorhandler commonerrorhandler() {
backoff backoff = new fixedbackoff(5000l, 3l);
return new defaulterrorhandler(backoff);
}
然后把这个handler 添加到concurrentkafkalistenercontainerfactory中就行了
三、设置消费失败处理方式
1 保存到数据库重试
我们需要在创建defaulterrorhandler类时加入一个consumerawarerecordrecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitsync方法,首先这个consumer.commitsync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset
public commonerrorhandler commonerrorhandler() {
// 创建 fixedbackoff 对象
backoff backoff = new fixedbackoff(5000l, 3l);
defaulterrorhandler defaulterrorhandler = new defaulterrorhandler((consumerawarerecordrecoverer) (record, consumer, exception) -> {
log.info("save to db " + record.value().tostring());
}, backoff);
return defaulterrorhandler;
}
如果你硬要提交也可以试试下面这种,指定提交当前的offset
public commonerrorhandler commonerrorhandler() {
// 创建 fixedbackoff 对象
backoff backoff = new fixedbackoff(5000l, 3l);
defaulterrorhandler defaulterrorhandler = new defaulterrorhandler((consumerawarerecordrecoverer) (record, consumer, exception) -> {
log.info("save to db " + record.value().tostring());
map<topicpartition, offsetandmetadata> offsets = new hashmap<>();
offsets.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(record.offset()));
consumer.commitsync(offsets);
}, backoff);
return defaulterrorhandler;
}
2 发送到kafka死信队列
仍然在创建defaulterrorhandler类时加入一个deadletterpublishingrecoverer 类就行了,默认会把消息发到kafkatemplate 配置的topic名字为your_topic+.dlt
@autowired
private kafkatemplate<string, string> kafkatemplate;
public commonerrorhandler commonerrorhandler() {
// 创建 fixedbackoff 对象
backoff backoff = new fixedbackoff(5000l, 3l);
defaulterrorhandler defaulterrorhandler = new defaulterrorhandler(new deadletterpublishingrecoverer(kafkatemplate), backoff);
return defaulterrorhandler;
}
consumerrecordrecoverer 接口总共就这2种实现方式
四、整体消费者代码粘贴
1 application.yml
kafka-consumer:
bootstrapservers: 192.168.31.114:9092
groupid: goods-center
#后台的心跳线程必须在30秒之内提交心跳,否则会rebalance
sessiontimeout: 30000
autooffsetreset: latest
#取消自动提交,即便如此 spring会帮助我们自动提交
enableautocommit: false
#自动提交间隔
autocommitinterval: 1000
#拉取的最小字节
fetchminsize: 1
#拉去最小字节的最大等待时间
fetchmaxwait: 500
maxpollrecords: 50
#300秒的提交间隔,如果程序大于300秒提交,会报错
maxpollinterval: 300000
#心跳间隔
heartbeatinterval: 10000
keydeserializer: org.apache.kafka.common.serialization.longdeserializer
valuedeserializer: org.springframework.kafka.support.serializer.jsondeserializer
2 kafkalistenerproperties
package com.ychen.goodscenter.fafka;
import lombok.getter;
import lombok.setter;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.configuration;
@configuration
//指定配置文件的前缀
@configurationproperties(prefix = "kafka-consumer")
@getter
@setter
public class kafkalistenerproperties {
private string groupid;
private string sessiontimeout;
private string bootstrapservers;
private string autooffsetreset;
private boolean enableautocommit;
private string autocommitinterval;
private string fetchminsize;
private string fetchmaxwait;
private string maxpollrecords;
private string maxpollinterval;
private string heartbeatinterval;
private string keydeserializer;
private string valuedeserializer;
}
3 kafkaconsumerconfig
package com.ychen.goodscenter.fafka;
import com.alibaba.fastjson2.jsonobject;
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.context.properties.enableconfigurationproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.config.kafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.backoff;
import org.springframework.util.backoff.fixedbackoff;
import java.util.hashmap;
import java.util.map;
@configuration
@enableconfigurationproperties(kafkalistenerproperties.class)
@slf4j
public class kafkaconsumerconfig {
@autowired
private kafkalistenerproperties kafkalistenerproperties;
@autowired
private kafkatemplate<string, string> kafkatemplate;
@bean
public kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() {
concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>();
factory.setconsumerfactory(consumerfactory());
// 并发数 多个微服务实例会均分
factory.setconcurrency(2);
// factory.setbatchlistener(true);
factory.setcommonerrorhandler(commonerrorhandler());
containerproperties containerproperties = factory.getcontainerproperties();
// 是否设置手动提交
containerproperties.setackmode(containerproperties.ackmode.manual_immediate);
return factory;
}
private consumerfactory<string, string> consumerfactory() {
map<string, object> consumerconfigs = consumerconfigs();
log.info("消费者的配置信息:{}", jsonobject.tojsonstring(consumerconfigs));
return new defaultkafkaconsumerfactory<>(consumerconfigs);
}
public commonerrorhandler commonerrorhandler() {
// 创建 fixedbackoff 对象
backoff backoff = new fixedbackoff(5000l, 3l);
defaulterrorhandler defaulterrorhandler = new defaulterrorhandler(new deadletterpublishingrecoverer(kafkatemplate), backoff);
// defaulterrorhandler defaulterrorhandler = new defaulterrorhandler((consumerawarerecordrecoverer) (record, consumer, exception) -> {
// log.info("save to db " + record.value().tostring());
// map<topicpartition, offsetandmetadata> offsets = new hashmap<>();
// offsets.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(record.offset()));
// consumer.commitsync(offsets);
// }, backoff);
return defaulterrorhandler;
}
public map<string, object> consumerconfigs() {
map<string, object> propsmap = new hashmap<>();
// 服务器地址
propsmap.put(consumerconfig.bootstrap_servers_config, kafkalistenerproperties.getbootstrapservers());
// 是否自动提交
propsmap.put(consumerconfig.enable_auto_commit_config, kafkalistenerproperties.isenableautocommit());
// 自动提交间隔
propsmap.put(consumerconfig.auto_commit_interval_ms_config, kafkalistenerproperties.getautocommitinterval());
//会话时间
propsmap.put(consumerconfig.session_timeout_ms_config, kafkalistenerproperties.getsessiontimeout());
//key序列化
propsmap.put(consumerconfig.key_deserializer_class_config, kafkalistenerproperties.getkeydeserializer());
//value序列化
propsmap.put(consumerconfig.value_deserializer_class_config, kafkalistenerproperties.getvaluedeserializer());
// 心跳时间
propsmap.put(consumerconfig.heartbeat_interval_ms_config, kafkalistenerproperties.getheartbeatinterval());
// 分组id
propsmap.put(consumerconfig.group_id_config, kafkalistenerproperties.getgroupid());
//消费策略
propsmap.put(consumerconfig.auto_offset_reset_config, kafkalistenerproperties.getautooffsetreset());
// poll记录数
propsmap.put(consumerconfig.max_poll_records_config, kafkalistenerproperties.getmaxpollrecords());
//poll时间
propsmap.put(consumerconfig.max_poll_interval_ms_config, kafkalistenerproperties.getmaxpollinterval());
propsmap.put("spring.json.trusted.packages", "com.ychen.**");
return propsmap;
}
}
4 messagelistener
package com.ychen.goodscenter.fafka;
import com.ychen.goodscenter.service.orderservice;
import com.ychen.goodscenter.vo.req.submitorderreq;
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.dao.duplicatekeyexception;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.kafka.support.acknowledgment;
import org.springframework.stereotype.component;
@component
@slf4j
public class messagelistener {
@autowired
private orderservice orderservice;
@kafkalistener(topics = "order-message-topic", containerfactory = "kafkalistenercontainerfactory")
public void processmessage(consumerrecord<long, submitorderreq> record, acknowledgment acknowledgment) {
log.info("order-message-topic message listener, thread id: " + thread.currentthread().getid());
try {
log.info("order-message-topic message received, orderid: {}", record.value().getorderid());
orderservice.submitorder(record.value());
// 同步提交
acknowledgment.acknowledge();
log.info("order-message-topic message acked: orderid: {}", record.value().getorderid());
} catch (duplicatekeyexception dupe) {
// 处理异常情况
log.error("order-message-topic message error duplicatekeyexception", dupe);
// 重复数据,忽略掉,同步提交
acknowledgment.acknowledge();
}
}
}
发表评论