证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。
证书处理:
- keystore 用于存储客户端的证书和私钥,用于客户端身份验证。
- truststore 用于存储受信任的根证书或证书链,用于验证服务器的身份。
合并一下证书:
cat your_cert.pem your_key.key > test.pem
- 合并证书和私钥为一个 pkcs12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias
2,将 pkcs12 文件导入到 java keystore 中:
keytool -importkeystore -srckeystore client.p12 -srcstoretype pkcs12 -destkeystore client.jks -deststoretype jks
要生成 truststore.jks
文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 ssl 连接。
下面是生成 truststore.jks
的步骤:
-
获取服务器的根证书或证书链。您可以使用之前提到的
openssl s_client
命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts -
将根证书或证书链保存为
.pem
文件。 -
使用
keytool
命令将根证书或证书链导入到truststore.jks
文件中:keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks
项目集成:
maven集成:
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
<version>2.5.5.release</version>
</dependency>
nacos配置:
spring:
kafka:
bootstrap-servers: ssl://connectedca.com:443 ##换成你自己的连接
ssl:
protocol: tls
###3这三个密码是你证书配置的时候设置的密码
trust-store-password: a123456
key-store-password: a123456
key-password: a123456
consumer:
group-id:
producer:
topic: *.event ##换成你自己的topic
核心配置:
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.admin.adminclientconfig;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.core.io.resource;
import org.springframework.core.io.resourceloader;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.core.kafkaadmin;
import org.springframework.kafka.listener.seektocurrenterrorhandler;
import org.springframework.kafka.support.serializer.errorhandlingdeserializer;
import org.springframework.kafka.support.serializer.jsondeserializer;
import java.util.hashmap;
import java.util.map;
@slf4j
@configuration
public class kafkaconfiguration {
@autowired
c3configproperties c3configproperties;
@autowired
private kafkaconfig kafkaproperties;
@autowired
private resourceloader resourceloader;
@bean
public kafkaadmin kafkaadmin() {
map <string, object> configs = new hashmap <>();
configs.put(adminclientconfig.bootstrap_servers_config, kafkaproperties.getbootstrapservers());
return new kafkaadmin(configs);
}
@bean
public defaultkafkaconsumerfactory <string, string> consumerfactory() {
map <string, object> consumerconfig = new hashmap <>();
consumerconfig.put(consumerconfig.bootstrap_servers_config, kafkaproperties.getbootstrapservers());
consumerconfig.put(consumerconfig.group_id_config, "newbie-car-owner-data-sync");
consumerconfig.put(consumerconfig.client_id_config, "newbie-car-owner-data-sync");
consumerconfig.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
consumerconfig.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
// 设置值的反序列化器为 errorhandlingdeserializer2,并配置类型信息
consumerconfig.put(errorhandlingdeserializer.value_deserializer_class, jsondeserializer.class);
consumerconfig.put(jsondeserializer.use_type_info_headers, false); // 启用类型信息头
consumerconfig.put(consumerconfig.auto_offset_reset_config, "earliest");
consumerconfig.put(jsondeserializer.value_default_type, "*.kafkac3msglistener"); // 设置默认类型信息
consumerconfig.put(jsondeserializer.trusted_packages, "*.kafkac3msglistener"); // 替换为你的实际包名
string pemurl = "";
string csrurl = "";
if (c3configproperties.getenvironment().equals("uat")) {
pemurl = "file/uat/kafka/client.jks";
csrurl = "file/uat/kafka/truststore.jks";
} else if (c3configproperties.getenvironment().equals("pre")) {
pemurl = "file/pre/kafka/client.jks";
csrurl = "file/pre/kafka/truststore.jks";
} else if (c3configproperties.getenvironment().equals("prod")) {
pemurl = "file/prod/kafka/client.jks";
csrurl = "file/prod/kafka/truststore.jks";
}
try {
// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载
resource pemresource = resourceloader.getresource("classpath:"+pemurl);
resource csrresource = resourceloader.getresource("classpath:"+csrurl);
// 获取证书文件的路径
string keystorepath = pemresource.getfile().getabsolutepath();
string truststorepath = csrresource.getfile().getabsolutepath();
consumerconfig.put("ssl.keystore.location", keystorepath);
consumerconfig.put("ssl.truststore.location", truststorepath);
}catch (exception e){
log.error("resource file error:{}",e.getmessage());
}
consumerconfig.put("security.protocol", "ssl");
consumerconfig.put("ssl.truststore.password", kafkaproperties.gettruststorepassword());
consumerconfig.put("ssl.keystore.password", kafkaproperties.getkeystorepassword());
consumerconfig.put("ssl.key.password", kafkaproperties.getkeypassword());
return new defaultkafkaconsumerfactory <>(consumerconfig);
}
@bean
public concurrentkafkalistenercontainerfactory <string, string> kafkalistenercontainerfactory() {
concurrentkafkalistenercontainerfactory <string, string> factory = new concurrentkafkalistenercontainerfactory <>();
factory.setconsumerfactory(consumerfactory());
factory.setconcurrency(3); // 设置并发消费者数量
factory.seterrorhandler(new seektocurrenterrorhandler()); // 错误处理器
return factory;
}
@bean
public kafkac3msglistener kafkac3msglistener() {
return new kafkac3msglistener();
}
}
注入配置:
import lombok.data;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.configuration;
@data
@configuration
public class kafkaconfig {
@value("${spring.kafka.bootstrap-servers}")
private string bootstrapservers;
@value("${spring.kafka.consumer.group-id}")
private string groupid;
@value("${spring.kafka.producer.topic}")
private string topic;
@value("${spring.kafka.ssl.trust-store-password}")
private string truststorepassword;
@value("${spring.kafka.ssl.key-store-password}")
private string keystorepassword;
@value("${spring.kafka.ssl.key-password}")
private string keypassword;
}
能够看到这个配置就成功了表示:
然后在监听处理消息即可
————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)
发表评论