当前位置: 代码网 > it编程>编程语言>Java > Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

2024年08月01日 Java 我要评论
证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。命令将根证书或证书链导入到。将根证书或证书链保存为。

证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

  • keystore 用于存储客户端的证书和私钥,用于客户端身份验证。
  • truststore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

  1. 合并证书和私钥为一个 pkcs12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 pkcs12 文件导入到 java keystore 中:

keytool -importkeystore -srckeystore client.p12 -srcstoretype pkcs12 -destkeystore client.jks -deststoretype jks

要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 ssl 连接。

下面是生成 truststore.jks 的步骤:

  1. 获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts

  2. 将根证书或证书链保存为 .pem 文件。

  3. 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中:

    keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

 

项目集成:

maven集成:

  <dependency>
            <groupid>org.springframework.kafka</groupid>
            <artifactid>spring-kafka</artifactid>
            <version>2.5.5.release</version>
        </dependency>

nacos配置:

spring:
  kafka:
    bootstrap-servers: ssl://connectedca.com:443  ##换成你自己的连接
    ssl:
      protocol: tls
###3这三个密码是你证书配置的时候设置的密码
      trust-store-password: a123456
      key-store-password: a123456
      key-password: a123456
    consumer:
      group-id: 
    producer:
      topic: *.event  ##换成你自己的topic

核心配置:


import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.admin.adminclientconfig;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.core.io.resource;
import org.springframework.core.io.resourceloader;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.core.kafkaadmin;
import org.springframework.kafka.listener.seektocurrenterrorhandler;
import org.springframework.kafka.support.serializer.errorhandlingdeserializer;
import org.springframework.kafka.support.serializer.jsondeserializer;


import java.util.hashmap;
import java.util.map;
@slf4j
@configuration
public class kafkaconfiguration {

    @autowired
    c3configproperties c3configproperties;
    @autowired
    private kafkaconfig kafkaproperties;
    @autowired
    private resourceloader resourceloader;
    @bean
    public kafkaadmin kafkaadmin() {
        map <string, object> configs = new hashmap <>();
        configs.put(adminclientconfig.bootstrap_servers_config, kafkaproperties.getbootstrapservers());
        return new kafkaadmin(configs);
    }

    @bean
    public defaultkafkaconsumerfactory <string, string> consumerfactory() {
        map <string, object> consumerconfig = new hashmap <>();
        consumerconfig.put(consumerconfig.bootstrap_servers_config, kafkaproperties.getbootstrapservers());
        consumerconfig.put(consumerconfig.group_id_config, "newbie-car-owner-data-sync");
        consumerconfig.put(consumerconfig.client_id_config, "newbie-car-owner-data-sync");

        consumerconfig.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
        consumerconfig.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);

        // 设置值的反序列化器为 errorhandlingdeserializer2,并配置类型信息
        consumerconfig.put(errorhandlingdeserializer.value_deserializer_class, jsondeserializer.class);
        consumerconfig.put(jsondeserializer.use_type_info_headers, false); // 启用类型信息头
        consumerconfig.put(consumerconfig.auto_offset_reset_config, "earliest");
        consumerconfig.put(jsondeserializer.value_default_type, "*.kafkac3msglistener"); // 设置默认类型信息
        consumerconfig.put(jsondeserializer.trusted_packages, "*.kafkac3msglistener"); // 替换为你的实际包名


        string pemurl = "";
        string csrurl = "";
        if (c3configproperties.getenvironment().equals("uat")) {
            pemurl = "file/uat/kafka/client.jks";
            csrurl = "file/uat/kafka/truststore.jks";
        } else if (c3configproperties.getenvironment().equals("pre")) {
            pemurl = "file/pre/kafka/client.jks";
            csrurl = "file/pre/kafka/truststore.jks";

        } else if (c3configproperties.getenvironment().equals("prod")) {
            pemurl = "file/prod/kafka/client.jks";
            csrurl = "file/prod/kafka/truststore.jks";
        }

       try {
          
           // 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载
           resource pemresource = resourceloader.getresource("classpath:"+pemurl);
           resource csrresource = resourceloader.getresource("classpath:"+csrurl);
// 获取证书文件的路径
           string keystorepath = pemresource.getfile().getabsolutepath();
           string truststorepath = csrresource.getfile().getabsolutepath();
           consumerconfig.put("ssl.keystore.location", keystorepath);
           consumerconfig.put("ssl.truststore.location", truststorepath);

       }catch (exception e){
           log.error("resource file error:{}",e.getmessage());
       }
        consumerconfig.put("security.protocol", "ssl");
   consumerconfig.put("ssl.truststore.password", kafkaproperties.gettruststorepassword());
        consumerconfig.put("ssl.keystore.password", kafkaproperties.getkeystorepassword());
        consumerconfig.put("ssl.key.password", kafkaproperties.getkeypassword());

        return new defaultkafkaconsumerfactory <>(consumerconfig);
    }


    @bean
    public concurrentkafkalistenercontainerfactory <string, string> kafkalistenercontainerfactory() {
        concurrentkafkalistenercontainerfactory <string, string> factory = new concurrentkafkalistenercontainerfactory <>();
        factory.setconsumerfactory(consumerfactory());
        factory.setconcurrency(3); // 设置并发消费者数量
        factory.seterrorhandler(new seektocurrenterrorhandler()); // 错误处理器

        return factory;
    }

    @bean
    public kafkac3msglistener kafkac3msglistener() {
        return new kafkac3msglistener();
    }


}

注入配置:


import lombok.data;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.configuration;

@data
@configuration
public class kafkaconfig {

    @value("${spring.kafka.bootstrap-servers}")
    private string bootstrapservers;

    @value("${spring.kafka.consumer.group-id}")
    private string groupid;

    @value("${spring.kafka.producer.topic}")
    private string topic;

    @value("${spring.kafka.ssl.trust-store-password}")
    private string truststorepassword;

    @value("${spring.kafka.ssl.key-store-password}")
    private string keystorepassword;

    @value("${spring.kafka.ssl.key-password}")
    private string keypassword;

}

能够看到这个配置就成功了表示:

然后在监听处理消息即可

 ————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com