背景
实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。
核心配置
自动化配置类
import com.example.kafka.autoconfig.customkafkadatasourceregister; import com.example.kafka.autoconfig.kafkaconsumerconfig; import org.springframework.beans.beansexception; import org.springframework.beans.factory.beanfactory; import org.springframework.beans.factory.beanfactoryaware; import org.springframework.beans.factory.config.smartinstantiationawarebeanpostprocessor; import org.springframework.boot.autoconfigure.condition.conditionalonwebapplication; import org.springframework.boot.context.properties.enableconfigurationproperties; import org.springframework.context.annotation.configuration; import org.springframework.context.annotation.import; import org.springframework.kafka.annotation.enablekafka; @enablekafka @configuration( proxybeanmethods = false ) @conditionalonwebapplication @enableconfigurationproperties({kafkaconsumerconfig.class}) @import({customkafkadatasourceregister.class}) public class mykafkaautoconfiguration implements beanfactoryaware, smartinstantiationawarebeanpostprocessor { public mykafkaautoconfiguration() { } public void setbeanfactory(beanfactory beanfactory) throws beansexception { beanfactory.getbean(customkafkadatasourceregister.class); } }
注册生产者、消费者核心bean到spring
public void afterpropertiesset() { map<string, consumerconfigwrapper> factories = kafkaconsumerconfig.getfactories(); if (factories != null && !factories.isempty()) { factories.foreach((factoryname, consumerconfig) -> { kafkaproperties.listener listener = consumerconfig.getlistener(); integer concurrency = consumerconfig.getconcurrency(); // 创建监听容器工厂 concurrentkafkalistenercontainerfactory<string, string> containerfactory = createkafkalistenercontainerfactory(consumerconfig.buildproperties(), listener, concurrency); // 注册到容器 if (!beanfactory.containsbean(factoryname)) { beanfactory.registersingleton(factoryname, containerfactory); } }); } map<string, kafkaproperties.producer> templates = kafkaproducerconfig.gettemplates(); if (!objectutils.isempty(templates)) { templates.foreach((templatename, producerconfig) -> { //registerbean(beanfactory, templatename, kafkatemplate.class, propertyvalues); //注册spring bean的两种方式 registerbeanwithconstructor(beanfactory, templatename, kafkatemplate.class, producerfactoryvalues(producerconfig.buildproperties())); }); } }
配置spring.factories
org.springframework.boot.autoconfigure.enableautoconfiguration=\ com.example.kafka.mykafkaautoconfiguration
yml配置
spring: kafka: multiple: consumer: factories: test-factory: key-deserializer: org.apache.kafka.common.serialization.stringdeserializer value-deserializer: org.apache.kafka.common.serialization.stringdeserializer bootstrap-servers: 192.168.56.112:9092 group-id: group_a concurrency: 25 fetch-min-size: 1048576 fetch-max-wait: 3000 listener: type: batch properties: spring-json-trusted-packages: '*' key-deserializer: org.apache.kafka.common.serialization.stringdeserializer value-deserializer: org.apache.kafka.common.serialization.stringdeserializer auto-offset-reset: latest producer: templates: test-template: bootstrap-servers: 192.168.56.112:9092 key-serializer: org.apache.kafka.common.serialization.stringserializer value-serializer: org.apache.kafka.common.serialization.stringserializer key-serializer: org.apache.kafka.common.serialization.stringserializer value-serializer: org.apache.kafka.common.serialization.stringserializer
使用
到此这篇关于springboot实现kafka多源配置的示例代码的文章就介绍到这了,更多相关springboot kafka多源配置内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论