当前位置: 代码网 > it编程>编程语言>Java > SpringBoot实现kafka多源配置的示例代码

SpringBoot实现kafka多源配置的示例代码

2024年06月17日 Java 我要评论
背景实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。核心配置自动化配

背景

实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

核心配置

自动化配置类

import com.example.kafka.autoconfig.customkafkadatasourceregister;
import com.example.kafka.autoconfig.kafkaconsumerconfig;
import org.springframework.beans.beansexception;
import org.springframework.beans.factory.beanfactory;
import org.springframework.beans.factory.beanfactoryaware;
import org.springframework.beans.factory.config.smartinstantiationawarebeanpostprocessor;
import org.springframework.boot.autoconfigure.condition.conditionalonwebapplication;
import org.springframework.boot.context.properties.enableconfigurationproperties;
import org.springframework.context.annotation.configuration;
import org.springframework.context.annotation.import;
import org.springframework.kafka.annotation.enablekafka;

@enablekafka
@configuration(
        proxybeanmethods = false
)
@conditionalonwebapplication
@enableconfigurationproperties({kafkaconsumerconfig.class})
@import({customkafkadatasourceregister.class})
public class mykafkaautoconfiguration implements beanfactoryaware, smartinstantiationawarebeanpostprocessor {
    public mykafkaautoconfiguration() {
    }

    public void setbeanfactory(beanfactory beanfactory) throws beansexception {
        beanfactory.getbean(customkafkadatasourceregister.class);
    }
}

注册生产者、消费者核心bean到spring

public void afterpropertiesset() {
        map<string, consumerconfigwrapper> factories = kafkaconsumerconfig.getfactories();
        if (factories != null && !factories.isempty()) {
            factories.foreach((factoryname, consumerconfig) -> {
                kafkaproperties.listener listener = consumerconfig.getlistener();
                integer concurrency = consumerconfig.getconcurrency();
                // 创建监听容器工厂
                concurrentkafkalistenercontainerfactory<string, string> containerfactory = createkafkalistenercontainerfactory(consumerconfig.buildproperties(), listener, concurrency);
                // 注册到容器
                if (!beanfactory.containsbean(factoryname)) {
                    beanfactory.registersingleton(factoryname, containerfactory);
                }
            });
        }
        map<string, kafkaproperties.producer> templates = kafkaproducerconfig.gettemplates();
        if (!objectutils.isempty(templates)) {
            templates.foreach((templatename, producerconfig) -> {
                //registerbean(beanfactory, templatename, kafkatemplate.class, propertyvalues);
                //注册spring bean的两种方式
                registerbeanwithconstructor(beanfactory, templatename, kafkatemplate.class, producerfactoryvalues(producerconfig.buildproperties()));
            });
        }
    }

配置spring.factories

org.springframework.boot.autoconfigure.enableautoconfiguration=\
com.example.kafka.mykafkaautoconfiguration

yml配置

spring:
  kafka:
    multiple:
      consumer:
        factories:
          test-factory:
            key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
            value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
            bootstrap-servers: 192.168.56.112:9092
            group-id: group_a
            concurrency: 25
            fetch-min-size: 1048576
            fetch-max-wait: 3000
            listener:
              type: batch
            properties:
              spring-json-trusted-packages: '*'
        key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
        value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
        auto-offset-reset: latest
      producer:
        templates:
          test-template:
            bootstrap-servers: 192.168.56.112:9092
            key-serializer: org.apache.kafka.common.serialization.stringserializer
            value-serializer: org.apache.kafka.common.serialization.stringserializer
        key-serializer: org.apache.kafka.common.serialization.stringserializer
        value-serializer: org.apache.kafka.common.serialization.stringserializer

使用

在这里插入图片描述

在这里插入图片描述

到此这篇关于springboot实现kafka多源配置的示例代码的文章就介绍到这了,更多相关springboot kafka多源配置内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com