背景
实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。
核心配置
自动化配置类
import com.example.kafka.autoconfig.customkafkadatasourceregister;
import com.example.kafka.autoconfig.kafkaconsumerconfig;
import org.springframework.beans.beansexception;
import org.springframework.beans.factory.beanfactory;
import org.springframework.beans.factory.beanfactoryaware;
import org.springframework.beans.factory.config.smartinstantiationawarebeanpostprocessor;
import org.springframework.boot.autoconfigure.condition.conditionalonwebapplication;
import org.springframework.boot.context.properties.enableconfigurationproperties;
import org.springframework.context.annotation.configuration;
import org.springframework.context.annotation.import;
import org.springframework.kafka.annotation.enablekafka;
@enablekafka
@configuration(
proxybeanmethods = false
)
@conditionalonwebapplication
@enableconfigurationproperties({kafkaconsumerconfig.class})
@import({customkafkadatasourceregister.class})
public class mykafkaautoconfiguration implements beanfactoryaware, smartinstantiationawarebeanpostprocessor {
public mykafkaautoconfiguration() {
}
public void setbeanfactory(beanfactory beanfactory) throws beansexception {
beanfactory.getbean(customkafkadatasourceregister.class);
}
}
注册生产者、消费者核心bean到spring
public void afterpropertiesset() {
map<string, consumerconfigwrapper> factories = kafkaconsumerconfig.getfactories();
if (factories != null && !factories.isempty()) {
factories.foreach((factoryname, consumerconfig) -> {
kafkaproperties.listener listener = consumerconfig.getlistener();
integer concurrency = consumerconfig.getconcurrency();
// 创建监听容器工厂
concurrentkafkalistenercontainerfactory<string, string> containerfactory = createkafkalistenercontainerfactory(consumerconfig.buildproperties(), listener, concurrency);
// 注册到容器
if (!beanfactory.containsbean(factoryname)) {
beanfactory.registersingleton(factoryname, containerfactory);
}
});
}
map<string, kafkaproperties.producer> templates = kafkaproducerconfig.gettemplates();
if (!objectutils.isempty(templates)) {
templates.foreach((templatename, producerconfig) -> {
//registerbean(beanfactory, templatename, kafkatemplate.class, propertyvalues);
//注册spring bean的两种方式
registerbeanwithconstructor(beanfactory, templatename, kafkatemplate.class, producerfactoryvalues(producerconfig.buildproperties()));
});
}
}
配置spring.factories
org.springframework.boot.autoconfigure.enableautoconfiguration=\ com.example.kafka.mykafkaautoconfiguration
yml配置
spring:
kafka:
multiple:
consumer:
factories:
test-factory:
key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
bootstrap-servers: 192.168.56.112:9092
group-id: group_a
concurrency: 25
fetch-min-size: 1048576
fetch-max-wait: 3000
listener:
type: batch
properties:
spring-json-trusted-packages: '*'
key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
auto-offset-reset: latest
producer:
templates:
test-template:
bootstrap-servers: 192.168.56.112:9092
key-serializer: org.apache.kafka.common.serialization.stringserializer
value-serializer: org.apache.kafka.common.serialization.stringserializer
key-serializer: org.apache.kafka.common.serialization.stringserializer
value-serializer: org.apache.kafka.common.serialization.stringserializer
使用


到此这篇关于springboot实现kafka多源配置的示例代码的文章就介绍到这了,更多相关springboot kafka多源配置内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论