一、kafka简介
1.1 什么是kafka
apache kafka是一个分布式流处理平台,具有以下核心特性:
- 高吞吐量:支持每秒百万级消息处理
- 可扩展性:支持水平扩展,可动态添加节点
- 持久化存储:消息可持久化到磁盘,支持数据保留策略
- 高可用性:通过副本机制保证数据不丢失
- 分布式架构:支持多生产者和消费者
1.2 kafka核心概念
- broker:kafka集群中的单个节点
- topic:消息的分类主题
- partition:topic的分区,实现并行处理
- replica:分区副本,保证高可用
- producer:消息生产者
- consumer:消息消费者
- consumer group:消费者组
二、搭建kafka高可用集群
集群架构规划
建议至少3个节点的kafka集群 + 3个节点的zookeeper集群:
- zookeeper集群:zk1:2181, zk2:2181, zk3:2181
- kafka集群:kafka1:9092, kafka2:9092, kafka3:9092
三、springboot整合kafka详细步骤
3.1 创建springboot项目
使用spring initializr创建项目,添加依赖:
<!-- pom.xml -->
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
</dependency>
<dependency>
<groupid>org.projectlombok</groupid>
<artifactid>lombok</artifactid>
<optional>true</optional>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-validation</artifactid>
</dependency>
</dependencies>
3.2 配置文件
# application.yml
spring:
kafka:
# kafka集群配置(高可用)
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
# 生产者配置
producer:
retries: 3 # 发送失败重试次数
acks: all # 所有副本确认才认为发送成功
key-serializer: org.apache.kafka.common.serialization.stringserializer
value-serializer: org.springframework.kafka.support.serializer.jsonserializer
properties:
compression.type: snappy # 压缩类型
linger.ms: 5 # 等待时间,批量发送提高吞吐量
# 消费者配置
consumer:
group-id: ${spring.application.name}-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
value-deserializer: org.springframework.kafka.support.serializer.jsondeserializer
properties:
spring.json.trusted.packages: "com.example.kafka.dto"
max.poll.records: 500 # 一次拉取最大记录数
session.timeout.ms: 10000 # 会话超时时间
heartbeat.interval.ms: 3000 # 心跳间隔
# 监听器配置
listener:
concurrency: 3 # 并发消费者数量
ack-mode: batch # 批量确认
missing-topics-fatal: false # 主题不存在时不报错
# 高可用配置
properties:
# 分区副本配置
replication.factor: 3
min.insync.replicas: 2
# 生产者的高可用配置
enable.idempotence: true # 幂等性
max.in.flight.requests.per.connection: 5
# 自定义配置
kafka:
topics:
order-topic: order-topic
payment-topic: payment-topic
retry-topic: retry-topic
retry:
max-attempts: 3
backoff-interval: 1000
3.3 配置类
// kafkaconfig.java
@configuration
@enablekafka
@slf4j
public class kafkaconfig {
@value("${kafka.topics.order-topic}")
private string ordertopic;
@value("${kafka.topics.payment-topic}")
private string paymenttopic;
@value("${kafka.topics.retry-topic}")
private string retrytopic;
@bean
public kafkaadmin kafkaadmin() {
map<string, object> configs = new hashmap<>();
configs.put(adminclientconfig.bootstrap_servers_config,
"kafka1:9092,kafka2:9092,kafka3:9092");
return new kafkaadmin(configs);
}
@bean
public newtopic ordertopic() {
// 创建topic:3个分区,3个副本
return new newtopic(ordertopic, 3, (short) 3);
}
@bean
public newtopic paymenttopic() {
return new newtopic(paymenttopic, 2, (short) 3);
}
@bean
public newtopic retrytopic() {
return new newtopic(retrytopic, 1, (short) 3);
}
// 死信队列配置
@bean
public deadletterpublishingrecoverer dlqrecoverer(kafkatemplate<string, object> template) {
return new deadletterpublishingrecoverer(template,
(record, ex) -> {
log.error("消息处理失败,发送到死信队列: {}", record.value(), ex);
return new topicpartition("dlq-topic", record.partition());
});
}
@bean
public defaulterrorhandler errorhandler(deadletterpublishingrecoverer dlqrecoverer) {
// 重试3次后进入死信队列
defaulterrorhandler handler = new defaulterrorhandler(dlqrecoverer,
new fixedbackoff(1000l, 3));
handler.addnotretryableexceptions(illegalargumentexception.class);
return handler;
}
// 生产者工厂增强配置
@bean
public producerfactory<string, object> producerfactory() {
map<string, object> configprops = new hashmap<>();
configprops.put(producerconfig.bootstrap_servers_config,
"kafka1:9092,kafka2:9092,kafka3:9092");
configprops.put(producerconfig.key_serializer_class_config,
stringserializer.class);
configprops.put(producerconfig.value_serializer_class_config,
jsonserializer.class);
configprops.put(producerconfig.acks_config, "all"); // 所有副本确认
configprops.put(producerconfig.retries_config, 3); // 重试次数
configprops.put(producerconfig.enable_idempotence_config, true); // 幂等性
configprops.put(producerconfig.max_in_flight_requests_per_connection, 5);
return new defaultkafkaproducerfactory<>(configprops);
}
}
3.4 消息实体类
// ordermessage.java
@data
@noargsconstructor
@allargsconstructor
@builder
public class ordermessage implements serializable {
private string orderid;
private string userid;
private bigdecimal amount;
private string productname;
private integer quantity;
private localdatetime createtime;
private messagestatus status;
public enum messagestatus {
pending, processing, success, failed
}
}
// paymentmessage.java
@data
@noargsconstructor
@allargsconstructor
@builder
public class paymentmessage {
private string paymentid;
private string orderid;
private bigdecimal amount;
private paymentmethod paymentmethod;
private paymentstatus status;
private localdatetime paymenttime;
public enum paymentmethod {
alipay, wechat, credit_card
}
public enum paymentstatus {
init, processing, success, failed
}
}
3.5 生产者服务
// kafkaproducerservice.java
@service
@slf4j
public class kafkaproducerservice {
@autowired
private kafkatemplate<string, object> kafkatemplate;
@value("${kafka.topics.order-topic}")
private string ordertopic;
@value("${kafka.topics.payment-topic}")
private string paymenttopic;
/**
* 发送订单消息(同步)
*/
public sendresult<string, object> sendordersync(ordermessage ordermessage) {
try {
// 设置消息头
messageheaders headers = new messageheaders(map.of(
"message-id", uuid.randomuuid().tostring(),
"message-time", string.valueof(system.currenttimemillis())
));
message<ordermessage> message = messagebuilder
.withpayload(ordermessage)
.copyheaders(headers)
.build();
// 同步发送,等待确认
listenablefuture<sendresult<string, object>> future =
kafkatemplate.send(ordertopic, ordermessage.getorderid(), message);
// 等待发送结果
sendresult<string, object> result = future.get(5, timeunit.seconds);
log.info("订单消息发送成功: topic={}, partition={}, offset={}",
result.getrecordmetadata().topic(),
result.getrecordmetadata().partition(),
result.getrecordmetadata().offset());
return result;
} catch (exception e) {
log.error("订单消息发送失败: {}", ordermessage, e);
throw new runtimeexception("消息发送失败", e);
}
}
/**
* 发送订单消息(异步)
*/
public void sendorderasync(ordermessage ordermessage) {
kafkatemplate.send(ordertopic, ordermessage.getorderid(), ordermessage)
.addcallback(new listenablefuturecallback<sendresult<string, object>>() {
@override
public void onsuccess(sendresult<string, object> result) {
log.info("异步发送成功: topic={}, offset={}",
result.getrecordmetadata().topic(),
result.getrecordmetadata().offset());
}
@override
public void onfailure(throwable ex) {
log.error("异步发送失败: {}", ordermessage, ex);
// 可以添加重试逻辑或写入本地文件
}
});
}
/**
* 批量发送消息
*/
public void batchsendorders(list<ordermessage> ordermessages) {
ordermessages.foreach(message -> {
kafkatemplate.send(ordertopic, message.getorderid(), message);
});
kafkatemplate.flush(); // 确保所有消息都发送
}
/**
* 发送到指定分区
*/
public void sendtopartition(ordermessage ordermessage, int partition) {
kafkatemplate.send(ordertopic, partition,
ordermessage.getorderid(), ordermessage);
}
/**
* 事务消息发送
*/
@transactional(transactionmanager = "kafkatransactionmanager")
public void sendtransactionalmessage(ordermessage ordermessage) {
// 数据库操作
// orderrepository.save(order);
// kafka消息发送(与数据库操作在同一个事务中)
kafkatemplate.send(ordertopic, ordermessage.getorderid(), ordermessage);
// 其他业务操作
}
}
3.6 消费者服务
// kafkaconsumerservice.java
@service
@slf4j
public class kafkaconsumerservice {
private static final string order_container_factory = "ordercontainerfactory";
private static final string payment_container_factory = "paymentcontainerfactory";
/**
* 订单消息消费者 - 批量消费
*/
@kafkalistener(
topics = "${kafka.topics.order-topic}",
containerfactory = order_container_factory,
groupid = "order-consumer-group"
)
public void consumeordermessages(list<ordermessage> messages) {
log.info("收到批量订单消息,数量: {}", messages.size());
for (ordermessage message : messages) {
try {
processordermessage(message);
} catch (exception e) {
log.error("订单处理失败: {}", message.getorderid(), e);
// 记录失败消息,可以发送到重试队列
}
}
}
/**
* 单个订单消息消费
*/
@kafkalistener(
topics = "${kafka.topics.order-topic}",
groupid = "order-single-consumer-group"
)
public void consumesingleordermessage(
@payload ordermessage message,
@header(kafkaheaders.received_topic) string topic,
@header(kafkaheaders.received_partition) int partition,
@header(kafkaheaders.offset) long offset) {
log.info("收到单个订单消息: topic={}, partition={}, offset={}, orderid={}",
topic, partition, offset, message.getorderid());
try {
// 业务处理逻辑
processordermessage(message);
// 处理成功后,可以发送确认消息到下游
sendpaymentmessage(message);
} catch (exception e) {
log.error("订单处理失败: {}", message.getorderid(), e);
throw e; // 抛出异常会触发重试机制
}
}
/**
* 支付消息消费者
*/
@kafkalistener(
topics = "${kafka.topics.payment-topic}",
containerfactory = payment_container_factory,
groupid = "payment-consumer-group"
)
public void consumepaymentmessage(paymentmessage message) {
log.info("收到支付消息: {}", message.getpaymentid());
// 支付处理逻辑
try {
processpayment(message);
} catch (exception e) {
log.error("支付处理失败: {}", message.getpaymentid(), e);
}
}
private void processordermessage(ordermessage message) {
// 模拟业务处理
log.info("处理订单: {},金额: {}", message.getorderid(), message.getamount());
// 业务逻辑,如:
// 1. 验证订单
// 2. 扣减库存
// 3. 记录日志
// 4. 更新订单状态
// 模拟处理时间
try {
thread.sleep(100);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
}
private void sendpaymentmessage(ordermessage ordermessage) {
paymentmessage paymentmessage = paymentmessage.builder()
.paymentid(uuid.randomuuid().tostring())
.orderid(ordermessage.getorderid())
.amount(ordermessage.getamount())
.paymentmethod(paymentmessage.paymentmethod.alipay)
.status(paymentmessage.paymentstatus.init)
.paymenttime(localdatetime.now())
.build();
// 这里可以使用kafkatemplate发送支付消息
}
private void processpayment(paymentmessage message) {
// 支付处理逻辑
log.info("处理支付: {},订单: {}", message.getpaymentid(), message.getorderid());
}
}
3.7 消费者容器工厂配置
// consumerconfig.java
@configuration
public class consumerconfig {
@value("${spring.kafka.bootstrap-servers}")
private string bootstrapservers;
// 订单消费者容器工厂(批量消费)
@bean(order_container_factory)
public concurrentkafkalistenercontainerfactory<string, ordermessage>
ordercontainerfactory() {
concurrentkafkalistenercontainerfactory<string, ordermessage> factory =
new concurrentkafkalistenercontainerfactory<>();
factory.setconsumerfactory(orderconsumerfactory());
factory.setconcurrency(3); // 并发消费者数量
factory.getcontainerproperties().setpolltimeout(3000);
factory.setbatchlistener(true); // 启用批量消费
factory.getcontainerproperties().setackmode(containerproperties.ackmode.batch);
// 设置批量消费参数
factory.getcontainerproperties().setidlebetweenpolls(1000);
return factory;
}
@bean
public consumerfactory<string, ordermessage> orderconsumerfactory() {
map<string, object> props = new hashmap<>();
props.put(consumerconfig.bootstrap_servers_config, bootstrapservers);
props.put(consumerconfig.group_id_config, "order-consumer-group");
props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
props.put(consumerconfig.value_deserializer_class_config, jsondeserializer.class);
props.put(jsondeserializer.trusted_packages, "com.example.kafka.dto");
props.put(consumerconfig.max_poll_records_config, 500); // 批量拉取数量
props.put(consumerconfig.enable_auto_commit_config, false);
props.put(consumerconfig.auto_offset_reset_config, "earliest");
// 高可用配置
props.put(consumerconfig.session_timeout_ms_config, 10000);
props.put(consumerconfig.heartbeat_interval_ms_config, 3000);
return new defaultkafkaconsumerfactory<>(props);
}
}
3.8 监控和管理端点
// kafkamonitorcontroller.java
@restcontroller
@requestmapping("/api/kafka")
@slf4j
public class kafkamonitorcontroller {
@autowired
private kafkaadmin kafkaadmin;
@autowired
private kafkatemplate<string, object> kafkatemplate;
/**
* 获取topic列表
*/
@getmapping("/topics")
public responseentity<list<string>> gettopics() throws exception {
try (adminclient adminclient = adminclient.create(kafkaadmin.getconfigurationproperties())) {
listtopicsresult topicsresult = adminclient.listtopics();
set<string> topicnames = topicsresult.names().get();
return responseentity.ok(new arraylist<>(topicnames));
}
}
/**
* 获取topic详情
*/
@getmapping("/topics/{topic}/details")
public responseentity<map<integer, list<integer>>> gettopicdetails(
@pathvariable string topic) throws exception {
try (adminclient adminclient = adminclient.create(kafkaadmin.getconfigurationproperties())) {
describetopicsresult describeresult = adminclient.describetopics(collections.singleton(topic));
topicdescription topicdescription = describeresult.values().get(topic).get();
map<integer, list<integer>> partitioninfo = new hashmap<>();
for (topicpartitioninfo partition : topicdescription.partitions()) {
list<integer> replicas = partition.replicas().stream()
.map(node::id)
.collect(collectors.tolist());
partitioninfo.put(partition.partition(), replicas);
}
return responseentity.ok(partitioninfo);
}
}
/**
* 发送测试消息
*/
@postmapping("/send-test")
public responseentity<string> sendtestmessage(@requestparam string topic) {
ordermessage testmessage = ordermessage.builder()
.orderid("test-" + system.currenttimemillis())
.userid("test-user")
.amount(new bigdecimal("100.00"))
.productname("测试商品")
.quantity(1)
.createtime(localdatetime.now())
.status(ordermessage.messagestatus.pending)
.build();
kafkatemplate.send(topic, testmessage.getorderid(), testmessage);
return responseentity.ok("测试消息发送成功");
}
/**
* 获取消费者组信息
*/
@getmapping("/consumer-groups")
public responseentity<map<string, object>> getconsumergroups() throws exception {
try (adminclient adminclient = adminclient.create(kafkaadmin.getconfigurationproperties())) {
listconsumergroupsresult groupsresult = adminclient.listconsumergroups();
collection<consumergrouplisting> groups = groupsresult.all().get();
map<string, object> result = new hashmap<>();
result.put("consumergroups", groups);
result.put("count", groups.size());
return responseentity.ok(result);
}
}
}
3.9 异常处理和重试机制
// kafkaexceptionhandler.java
@component
@slf4j
public class kafkaexceptionhandler {
/**
* 全局kafka监听器异常处理
*/
@eventlistener
public void handleexception(listenercontainerconsumerfailedevent event) {
log.error("kafka消费者异常: {}", event.getcontainer().getlistenerid(), event.getexception());
// 记录异常信息
// 发送告警
// 写入错误日志
}
/**
* 自定义重试策略
*/
@bean
public retrytemplate kafkaretrytemplate() {
retrytemplate retrytemplate = new retrytemplate();
// 重试策略:最多重试3次,每次间隔1秒
simpleretrypolicy retrypolicy = new simpleretrypolicy();
retrypolicy.setmaxattempts(3);
// 退避策略
fixedbackoffpolicy backoffpolicy = new fixedbackoffpolicy();
backoffpolicy.setbackoffperiod(1000l);
retrytemplate.setretrypolicy(retrypolicy);
retrytemplate.setbackoffpolicy(backoffpolicy);
return retrytemplate;
}
}
3.10 健康检查
// kafkahealthindicator.java
@component
public class kafkahealthindicator implements healthindicator {
@autowired
private kafkatemplate<string, object> kafkatemplate;
@override
public health health() {
try {
// 尝试发送一个测试消息来检查kafka连接
kafkatemplate.send("health-check-topic", "health-check", "ping")
.get(5, timeunit.seconds);
return health.up()
.withdetail("status", "kafka集群连接正常")
.withdetail("timestamp", localdatetime.now())
.build();
} catch (exception e) {
return health.down()
.withdetail("status", "kafka集群连接异常")
.withdetail("error", e.getmessage())
.withdetail("timestamp", localdatetime.now())
.build();
}
}
}
四、高可用性保障措施
4.1 集群配置建议
# kafka-server.properties 关键配置 broker.id=1 listeners=plaintext://:9092 advertised.listeners=plaintext://kafka1:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 日志配置 log.dirs=/data/kafka-logs num.partitions=3 num.recovery.threads.per.data.dir=1 # 副本和isr配置 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 default.replication.factor=3 min.insync.replicas=2 # 日志保留 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 # zookeeper配置 zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 zookeeper.connection.timeout.ms=6000
4.2 生产环境部署建议
硬件配置:
- 至少3个kafka节点 + 3个zookeeper节点
- ssd磁盘提高io性能
- 充足的内存和cpu资源
网络配置:
- 使用专用网络
- 配置合理的防火墙规则
监控告警:
- 使用kafka manager或confluent control center
- 监控指标:吞吐量、延迟、副本同步状态
五、测试示例
// kafkaintegrationtest.java
@springboottest
@slf4j
class kafkaintegrationtest {
@autowired
private kafkaproducerservice producerservice;
@test
void testsendandreceivemessage() throws interruptedexception {
// 创建测试消息
ordermessage ordermessage = ordermessage.builder()
.orderid("test-" + uuid.randomuuid())
.userid("user-001")
.amount(new bigdecimal("199.99"))
.productname("测试商品")
.quantity(2)
.createtime(localdatetime.now())
.status(ordermessage.messagestatus.pending)
.build();
// 发送消息
sendresult<string, object> result = producerservice.sendordersync(ordermessage);
assertnotnull(result);
assertnotnull(result.getrecordmetadata());
log.info("消息发送成功,分区: {}, offset: {}",
result.getrecordmetadata().partition(),
result.getrecordmetadata().offset());
// 等待消费者处理
thread.sleep(2000);
}
@test
void testbatchsend() {
list<ordermessage> messages = new arraylist<>();
for (int i = 0; i < 100; i++) {
ordermessage message = ordermessage.builder()
.orderid("batch-" + i)
.userid("user-" + i)
.amount(new bigdecimal(i * 10))
.productname("商品" + i)
.quantity(1)
.createtime(localdatetime.now())
.status(ordermessage.messagestatus.pending)
.build();
messages.add(message);
}
producerservice.batchsendorders(messages);
}
}
六、总结
6.1 实现的高可用特性
- 数据冗余:通过副本机制(replication factor=3)保证数据安全
- 故障转移:leader选举机制确保节点故障时自动切换
- 负载均衡:分区机制实现水平扩展和负载均衡
- 容错处理:死信队列和重试机制保障消息不丢失
- 监控告警:完善的健康检查和监控体系
6.2 最佳实践建议
- 合理规划分区:根据业务吞吐量和消费者数量设置分区数
- 监控副本同步:确保isr(in-sync replicas)数量足够
- 配置重试机制:针对网络波动和临时故障进行重试
- 实施消息幂等:避免重复消费问题
- 定期清理数据:设置合理的消息保留策略
6.3 性能优化建议
- 批量操作:使用批量发送和批量消费提高吞吐量
- 压缩传输:启用消息压缩减少网络带宽消耗
- 合理批大小:根据业务场景调整批量大小
- 异步确认:非关键业务使用异步发送提高响应速度
通过以上方案,springboot整合kafka实现了高可用的消息队列集群,具备生产级的可靠性、可扩展性和容错能力,能够满足企业级应用的需求。
以上就是springboot整合kafka实现高可用消息队列集群详解的详细内容,更多关于springboot kafka实现消息队列集群的资料请关注代码网其它相关文章!
发表评论