当前位置: 代码网 > 服务器>网络>网络协议 > Kafka SASL_SSL双重认证

Kafka SASL_SSL双重认证

2024年08月01日 网络协议 我要评论
kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

1. 背景

kafka提供了多种安全认证机制,主要分为sasl和ssl两大类。

  • sasl: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中sasl/plain是基于账号密码的认证方式。
  • ssl: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。

在 kafka 中启用 sasl_ssl 安全协议时,sasl 用于客户端和服务器之间的身份验证,ssl 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。

因工作需要,需要在测试环境搭建一套基于sasl_ssl协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

2. 环境

  • 操作系统:linux
  • kafka版本:kafka_2.13-2.7.1
  • zookeeper版本:apache-zookeeper-3.7.0
  • 应用程序版本:spring-boot-2.6.7、jdk1.8

3. 操作步骤

  1. 生成ssl证书
  2. 配置zookeeper
  3. 配置kafka
  4. 前三步配置完成后kafka就开启了sasl_ssl双重认证,可以使用kafka自带的客户端进行测试(3.4),
  5. 在业务代码中使用请查看(3.5)

3.1 生成ssl证书

按照步骤一步一步操作,生成服务器/客户端的ssl证书。也就是公钥与私钥

参考:【ssl协议】生成ssl证书 - lihewei - 博客园 (cnblogs.com)

3.2 配置zookeeper认证

第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:

server {
  org.apache.zookeeper.server.auth.digestloginmodule required
  user_admin="1qaz@wsx"
  user_kafka="1qaz@wsx";
};

第二步: zookeeper配置文件zoo.cfg中新增sasl认证配置,如下:

authprovider.1=org.apache.zookeeper.server.auth.saslauthenticationprovider
requireclientauthscheme=sasl
jaasloginrenew=3600000

第三步: 在apache-zookeeper-3.7.0/bin/zkserver.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

export server_jvmflags="-xmx${zk_server_heap}m $server_jvmflags -djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证

第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf

kafkaserver {
  org.apache.kafka.common.security.plain.plainloginmodule required
  username="admin"
  password="1qaz@wsx"
  user_admin="1qaz@wsx"
  user_kafka="1qaz@wsx";
};
 
client {
  org.apache.kafka.common.security.plain.plainloginmodule required
  username="kafka"
  password="1qaz@wsx";
};

kafka-client-jaas.conf

kafkaclient {
        org.apache.kafka.common.security.plain.plainloginmodule required
        username="kafka"
        password="1qaz@wsx";
};

第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

增加环境变量: -djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf

...

if [ "x$kafka_heap_opts" = "x" ]; then
    export kafka_heap_opts="-xmx1g -xms1g -djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi

...

**第三步:**修改 kafka 的 server.properties配置文件

#listeners=ssl://10.1.61.121:9092
host.name=node1
#listeners=plaintext://node1:9092,ssl://node1:9093
listeners=sasl_ssl://node1:9093
#advertised.listeners=ssl://node1:9092
advertised.listeners=sasl_ssl://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=q06688
ssl.key.password=q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=q06688
ssl.client.auth=required
ssl.enabled.protocols=tlsv1.2,tlsv1.1,tlsv1
ssl.keystore.type=jks 
ssl.truststore.type=jks 
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了https,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用ssl,默认值为security.inter.broker.protocol=plaintext
security.inter.broker.protocol=sasl_ssl
sasl.enabled.mechanisms=plain
sasl.mechanism.inter.broker.protocol=plain
authorizer.class.name=kafka.security.auth.simpleaclauthorizer
allow.everyone.if.no.acl.found=true

注意:这里有个坑,生成ssl密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname

3.4 使用kafka客户端进行验证

第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置sasl_ssl验证的基本信息。

consumer.properties:

bootstrap.servers=node1:9093
security.protocol=sasl_ssl
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=q06688

sasl.mechanism=plain
sasl.jaas.config=org.apache.kafka.common.security.scram.scramloginmodule required username="kafka" password="1qaz@wsx";

producer.properties:

bootstrap.servers=node1:9093
security.protocol=sasl_ssl
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=q06688

sasl.mechanism=plain
sasl.jaas.config=org.apache.kafka.common.security.scram.scramloginmodule required username="kafka" password="1qaz@wsx";

第二步: 使用命令行操作时,让其找到上述设置的sasl_ssl配置文件( --producer.config …/config/producer.properties)

#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093  --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>

#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc

3.5 使用java端代码进行认证

第一步: yaml 配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9093
    properties:
      sasl:
        mechanism: plain
        jaas:
          #此处填写 sasl登录时分配的用户名密码(注意password结尾;)
          config: org.apache.kafka.common.security.scram.scramloginmodule required username="kafka" password="1qaz@wsx";
      security:
        protocol: sasl_ssl
    ssl:
      trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jks
      trust-store-password: q06688
      key-store-type: jks
    producer:
      key-serializer: org.apache.kafka.common.serialization.stringserializer
      value-serializer: org.apache.kafka.common.serialization.stringserializer
      batch-size: 106384
      acks: -1
      retries: 3
      properties:
        linger-ms: 1
        retry.backoff.ms: 1000
        buffer-memory: 33554432

第二步: 使用 kafkatemplate 的方式,配置一个 config

import lombok.extern.slf4j.slf4j;
import org.apache.commons.io.fileutils;
import org.apache.kafka.clients.admin.adminclientconfig;
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.config.saslconfigs;
import org.apache.kafka.common.config.sslconfigs;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;

import java.util.hashmap;
import java.util.map;

@slf4j
@configuration
public class kafkaproducerconfig {

    @value("${spring.kafka.bootstrap-servers}")
    private string bootstrapservers;
    @value("${spring.kafka.producer.acks}")
    private string acks;
    @value("${spring.kafka.producer.retries}")
    private string retries;
    @value("${spring.kafka.producer.batch-size}")
    private string batchsize;
    @value("${spring.kafka.producer.properties.linger-ms}")
    private int lingerms;
    @value("${spring.kafka.producer.properties.buffer-memory}")
    private int buffermemory;
    @value("${spring.kafka.producer.key-serializer}")
    private string keyserializer;
    @value("${spring.kafka.producer.value-serializer}")
    private string valueserializer;
    @value("${spring.kafka.properties.security.protocol}")
    private string protocol;
    @value("${spring.kafka.properties.sasl.mechanism}")
    private string mechanism;
    @value("${spring.kafka.ssl.trust-store-location}")
    private string truststorelocation;
    @value("${spring.kafka.ssl.trust-store-password}")
    private string truststorepassword;
    @value("${spring.kafka.ssl.key-store-type}")
    private string keystoretype;
    @value("${spring.kafka.properties.sasl.jaas.config}")
    private string jaasconfig;

    @bean
    public kafkatemplate<string, string> kafkatemplate() {
        return new kafkatemplate<>(producerfactory());
    }

    /**
     * the producer factory config
     */
    @bean
    public producerfactory<string, string> producerfactory() {
        map<string, object> props = new hashmap<string, object>();
        props.put(producerconfig.bootstrap_servers_config, bootstrapservers);
        props.put(producerconfig.acks_config, acks);
        props.put(producerconfig.retries_config, retries);
        props.put(producerconfig.batch_size_config, batchsize);
        props.put(producerconfig.linger_ms_config, lingerms);
        props.put(producerconfig.buffer_memory_config, buffermemory);
        props.put(producerconfig.key_serializer_class_config, keyserializer);
        props.put(producerconfig.value_serializer_class_config, valueserializer);
        props.put("security.protocol", protocol);
        props.put(saslconfigs.sasl_mechanism, mechanism);
        props.put(sslconfigs.ssl_truststore_location_config, truststorelocation);
        props.put(sslconfigs.ssl_truststore_password_config, truststorepassword);
        props.put(sslconfigs.default_ssl_keystore_type, keystoretype);
        props.put(saslconfigs.sasl_jaas_config, jaasconfig);
        return new defaultkafkaproducerfactory<string, string>(props);
    }
}
(0)

相关文章:

  • HTTP/UDP/TCP/IP网络协议

    同时呢,4位值的是4个bit位,四个比特位的取值范围0-15.因此,这里的首部长度的单位也是4字节,比如:4个bit位 1111 == 15,实际表示的首部长就是60(4*15)字…

    2024年08月01日 网络
  • 网络协议层

    表示层(Presentation Layer):表示层负责数据的格式化、加密和压缩,以确保数据的可读性和可靠性。表示层的功能包括数据格式的转换、数据的加密解密和数据的压缩解压缩等,…

    2024年08月01日 网络
  • Nginx 配置 SSL(HTTPS)详解

    Nginx作为一款高性能的HTTP和反向代理,自然支持SSL/TLS加密通信。本文将详细介绍如何在Nginx中配置SSL,实现HTTPS的访问。随着互联网安全性的日益重要,HTTP…

    2024年08月01日 网络
  • day40—选择题

    day40—选择题

    思路:DNS劫持又称域名劫持,是指在劫持的网络范围内拦截域名解析的请求,分析请求的域名,把审查范围以外的请求放行,否则返回假的IP地址或者什么都不做使请求失去响... [阅读全文]
  • 网络协议:ICMP协议及实用工具介绍

    网络协议:ICMP协议及实用工具介绍

    ICMP(Internet Control Message Protocol)协议是TCP/IP协议簇中的一个子协议,用于在IP主机和路由器之间传递控制消息。控... [阅读全文]
  • 测试人必备技能:如何进行WebSocket接口测试?

    随着Web应用的日益普及,WebSocket作为一种全双工通信协议,在移动端、游戏、视频会议等方面得到广泛应用。而对于需要实时通信的Web应用来说,WebSocket接口测试是非常…

    2024年08月01日 网络

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com