当前位置: 代码网 > it编程>编程语言>Java > Kafka SASL认证与ACL配置

Kafka SASL认证与ACL配置

2024年07月31日 Java 我要评论
SASL/PLAIN,这种方式其实就是一个的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,等等!建议大家用SASL/SCRAM的方式 ,这种方式 用户名/密码是存储在zookeeper中,能够。该种认证方式还会使用sha256或sha512对,安全性相对会高一些。本文主要介绍SASL/PLAIN。

​ kafka版本 2.12-2.2.0,zookeeper版本:3.4.14,认证方式:sasl/plain,这种方式其实就是一个账号/密码的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,不能动态添加,密码明文等等!建议大家用sasl/scram的方式 ,这种方式 用户名/密码是存储在zookeeper中,能够支持动态添加用户。该种认证方式还会使用sha256或sha512对密码加密,安全性相对会高一些。本文主要介绍sasl/plain方式:

一、ssl证书生成

(1)创建证书目录

#kafka部署目录为/usr/local/kafka_2.12-2.2.0
mkdir /usr/local/kafka_2.12-2.2.0/ssl

(2)生成服务端的keystore文件(server.keystore.jks

​ 每个broker节点执行

keytool -keystore server.keystore.jks -alias kafka -validity 3650 -genkey -storepass 123456 -keypass 123456 -dname "cn=192.168.11.35,ou=hc,o=hw,l=shenzhen,st=guangdong,c=cn"

#cn=名字与姓氏/域名,ou=组织单位名称,o=组织名称,l=城市或区域名称,st=州或省份名称,c=单位的两字母国家代码

#查看证书命令
keytool -list -v -keystore ./server.keystore.jks

(3)生成ca认证证书(ca-cert、ca-key

​ 在任一broker节点执行,只需要执行一次,执行完成后生成了两个文件cat-key、ca-cert,将这两个文件分别拷贝到所有broker节点上。

openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 -passout pass:123456 -subj "/c=cn/st=guangdong/l=shenzhen/o=hw/ou=hc/cn=192.168.11.35"

(4)通过ca证书创建一个客户端信任证书(client.truststore.jks

​ 每个broker节点执行

keytool -keystore client.truststore.jks -alias cakafka -import -file ca-cert -storepass 123456 -noprompt

(5)通过ca证书创建一个服务端信任证书(server.truststore.jks

​ 每个broker节点执行

keytool -keystore server.truststore.jks -alias cakafka -import -file ca-cert -storepass 123456 -noprompt

以下为给服务端证书签名

(6)从密钥库导出证书服务端证书(cert-file

​ 每个broker节点执行

keytool -keystore server.keystore.jks -alias kafka -certreq -file cert-file -storepass 123456

(7)用ca给服务端证书进行签名处理(cert-signed

​ 每个broker节点执行

openssl x509 -req -ca ca-cert -cakey ca-key -in cert-file -out cert-signed -days 3650 -cacreateserial -caserial ca-cert.srl -passin pass:123456

(8)将ca证书导入到服务端keystore

​ 每个broker节点执行

keytool -keystore server.keystore.jks -alias cakafka -import -file ca-cert -storepass 123456 -noprompt

(9)将已签名的服务器证书导入到服务器keystore

​ 每个broker节点执行

keytool -keystore server.keystore.jks -alias kafka -import -file cert-signed -storepass 123456

客户端ssl证书签发

(10)导出客户端证书(client.keystore.jks

keytool -keystore client.keystore.jks -alias kafka -validity 3650 -genkey -storepass 123456 -keypass 123456 -dname "cn=192.168.11.35,ou=hc,o=hw,l=shenzhen,st=guangdong,c=cn"

(11)将证书文件导入到客户端keystore(client.cert-file

keytool -keystore client.keystore.jks -alias kafka -certreq -file client.cert-file -storepass 123456

(12)用ca给客户端证书进行签名处理(client.cert-signed

openssl x509 -req -ca ca-cert -cakey ca-key -in client.cert-file -out client.cert-signed -days 3650 -cacreateserial -caserial ca-cert.srl -passin pass:123456

(13)将ca证书导入到客户端keystore

keytool -keystore client.keystore.jks -alias cakafka -import -file ca-cert -storepass 123456 -noprompt

(14)将已签名的证书导入到客户端keystore

keytool -keystore client.keystore.jks -alias kafka -import -file client.cert-signed -storepass 123456

二、kafka配置

2.1、服务端配置

(1)修改kafka配置文件 server.properties

/usr/local/kafka_2.12-2.2.0/config/server.properties
#修改listeners,添加sasl_plaintext://192.168.11.35:9093
listeners=sasl_plaintext://192.168.11.35:9093,plaintext://192.168.1.35:9092
#对外发布地址,添加sasl_plaintext://39.108.132.165:9093,39.108.132.165为公网ip
advertised.listeners=sasl_plaintext://39.108.132.165:9093,plaintext://192.168.11.35:9092
#添加以下为sasl安全配置
security.inter.broker.protocol=sasl_plaintext	#表示broker间通信使用sasl_plaintext
sasl.enabled.mechanisms=plain	#表示开启plain认证机制
sasl.mechanism.inter.broker.protocol=plain	#表示broker间通信也启用plain机制
authorizer.class.name=kafka.security.auth.simpleaclauthorizer	#设置身份验证使用的类
super.users=user:admin	#设置超级用户,如果是多个需要分号分割,例如:user:admin;user:root
allow.everyone.if.no.acl.found=false	#默认为true,对所有用户topic可见,要禁用。
ssl.keystore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.truststore.jks
ssl.truststore.password=123456
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了https,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
#日志目录
log.dirs=/usr/local/kafka_2.12-2.2.0/logs
#连接zookeeper的配置
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
#其它配置使用默认即可

(2)添加sasl配置文件

​ 在kafka配置目录 **/usr/local/kafka_2.12-2.2.0/config **下创建一个 kafka_server_jaas.conf 文件。

vim /usr/local/kafka_2.12-2.2.0/config/kafka_server_jaas.conf 
kafkaserver {
    org.apache.kafka.common.security.plain.plainloginmodule required
    username="admin"
    password="1qaz@wsx"
    user_admin="1qaz@wsx"
    user_kafka="1qaz@wsx";
};
client {
org.apache.zookeeper.server.auth.digestloginmodule required
username="admin"
password="1qaz@wsx";
};

(3)在kafka启动脚本中配置环境变量

​ 在 /usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh 配置中增加环境变量:

-djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_server_jaas.conf

vim /usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh
...
if [ "x$kafka_heap_opts" = "x" ]; then
    export kafka_heap_opts="-xmx1g -xms1g -djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_server_jaas.conf"
fi

(4)启动kafka

/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.2.0/config/server.properties

2.2、客户端配置

(1)添加sasl配置文件

​ 在kafka配置目录 **/usr/local/kafka_2.12-2.2.0/config **下创建一个 kafka-client-jaas.conf 文件。

vim /usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf 
kafkaclient {
    org.apache.kafka.common.security.plain.plainloginmodule required
    username="kafka"
    password="1qaz@wsx";
};

(2)在kafka-console-producer.sh脚本中配置环境变量

​ 找到 “x$kafka_heap_opts”,添加以下参数:

-djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf

vim /usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh
...
if [ "x$kafka_heap_opts" = "x" ]; then
    export kafka_heap_opts="-xmx512m -djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf"
fi
...

(3)在kafka-console-consumer.sh脚本中配置环境变量

​ 找到 “x$kafka_heap_opts”,添加以下参数:

-djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf

...
if [ "x$kafka_heap_opts" = "x" ]; then
    export kafka_heap_opts="-xmx512m -djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf"
fi
...

(4)配置sasl_plaintext验证信息

​ 修改 /usr/local/kafka_2.12-2.2.0/config 下的 producer.propertiesconsumer.properties配置sasl_ssl验证的基本信息:

producer.properties:

vim /usr/local/kafka_2.12-2.2.0/config/producer.properties
#添加以下内容
security.protocol=sasl_plaintext
sasl.mechanism=plain
sasl.jaas.config=org.apache.kafka.common.security.plain.plainloginmodule required username="kafka" password="1qaz@wsx";

#如果是sasl_ssl方式,则添加以下内容
security.protocol=sasl_ssl
ssl.truststore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.truststore.jks
ssl.truststore.password=123456
ssl.endpoint.identification.algorithm=
sasl.mechanism=plain
sasl.jaas.config=org.apache.kafka.common.security.plain.plainloginmodule required username="kafka" password="1qaz@wsx";

consumer.properties:

vim /usr/local/kafka_2.12-2.2.0/config/consumer.properties
#添加以下内容
security.protocol=sasl_plaintext
sasl.mechanism=plain
sasl.jaas.config=org.apache.kafka.common.security.plain.plainloginmodule required username="kafka" password="1qaz@wsx";

#如果是sasl_ssl方式,则添加以下内容
security.protocol=sasl_ssl
ssl.truststore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.truststore.jks
ssl.truststore.password=123456
ssl.endpoint.identification.algorithm=
sasl.mechanism=plain
sasl.jaas.config=org.apache.kafka.common.security.plain.plainloginmodule required username="kafka" password="1qaz@wsx";

(5)使用命令行操作测试

如果使用 sasl_plaintext 方式,则连接9092端口,如下命令

/usr/local/kafka_2.12-2.2.0/bin/kafka-console-producer.sh --broker-list 192.168.11.35:9092 --topic first --producer.config /usr/local/kafka_2.12-2.2.0/config/producer.properties
>123
>adb
>qwe


/usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.35:9092 --topic first -consumer.config /usr/local/kafka_2.12-2.2.0/config/consumer.properties
123
adb
qwe

​ 如果使用 sasl_ssl 方式,则连接9093端口,如下命令:

/usr/local/kafka_2.12-2.2.0/bin/kafka-console-producer.sh --broker-list 192.168.11.35:9093 --topic first --producer.config /usr/local/kafka_2.12-2.2.0/config/producer.properties
>123
>adb
>qwe


/usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.35:9093 --topic first -consumer.config /usr/local/kafka_2.12-2.2.0/config/consumer.properties
123
adb
qwe

三、配置acl

3.1、zookeeper 配置 sasl

​ 若只关注 kafka 的安全认证,不需要配置 zookeeper 的 sasl,但 kafka 会在 zookeeper 中存储一些必要的信息,因此 zk 的安全认证也会影响到 kafka。

(1)新建 zoo_jaas.conf 文件

​ 在 zookeeper 的 conf 目录下新建一个 zoo_jaas.conf 文件:

vim /usr/local/zookeeper-3.4.14/conf/zoo_jaas.conf  
server {
  org.apache.kafka.common.security.plain.plainloginmodule required
  username="admin"
  password="1qaz@wsx"
  user_admin="1qaz@wsx"
  user_kafka="1qaz@wsx";
};

(2)修改 zoo.conf 文件

vim /usr/local/zookeeper-3.4.14/conf/zoo.conf 
#添加以下内容
authprovider.1=org.apache.zookeeper.server.auth.saslauthenticationprovider
requireclientauthscheme=sasl
jaasloginrenew=3600000
zookeeper.sasl.client=true

(3)导入依赖包

​ 因为使用的权限验证类为:org.apache.kafka.common.security.plain.plainloginmodule,所以需要 kafka 相关 jar 包,将 kafka 的 jar 包复制到 zookeeper 的 lib 目录:

cp /usr/local/kafka_2.12-2.2.0/libs/kafka-clients-2.2.0.jar /usr/local/zookeeper-3.4.14/lib/
cp /usr/local/kafka_2.12-2.2.0/libs/snappy-java-1.1.7.2.jar /usr/local/zookeeper-3.4.14/lib/
cp /usr/local/kafka_2.12-2.2.0/libs/lz4-java-1.5.0.jar /usr/local/zookeeper-3.4.14/lib/

​ 如果使用这个类 org.apache.zookeeper.server.auth.digestloginmodule 则不用以上操作。

(4)修改 zkenv.sh 文件

vim /usr/local/zookeeper-3.4.14/bin/zkenv.sh
#在zookeeper_prefix="${zoobindir}/.."行下面添加以下内容
server_jvmflags="-djava.security.auth.login.config=/usr/local/zookeeper-3.4.14/conf/zoo_jaas.conf"

(5)重启 zookeeper

/usr/local/zookeeper-3.4.14/bin/zkserver.sh restart

3.2、kafka配置acl

​ 在配置好sasl后,启动zookeeper和kafka之后,就可以使用 kafka-acls.sh 脚本来操作acl机制。 kafka-acls.sh 脚本位于kafka安装目录下的bin目录,如 /usr/local/kafka_2.12-2.2.0/bin/,进入此目录执行以下相关命令:

(1)查看:在kafka-acls.sh脚本中传入list参数来查看acl授权

./kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181

(2)配置acl来让writer用户有权限写入test这个topic

./kafka-acls.sh --authorizer kafka.security.auth.simpleaclauthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:writer --operation write --topic test

(3)为reader用户设置test topic的读权限

./kafka-acls.sh --authorizer kafka.security.auth.simpleaclauthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:reader --operation read --topic test

(4)设置访问group的权限

./kafka-acls.sh --authorizer kafka.security.auth.simpleaclauthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:reader --operation read --group test-group

–operation支持的操作有:read、write、delete、create、alter、describe、all,所以一般我们快捷方式授权:

  • 授权给某个消费者组读取和订阅权限就直接使用--consumer参数而不使用--operation来指定,前者就包含了允许消费者在主题上read、describe以及在消费者组在主题上read。
  • 授权给某个消费者组写和订阅权限就直接使用--producer参数,它包括了生产者对主题有写和订阅权限以及在集群上建立主题的权限。
# 快捷设置允许某个组使用某用户对某主题读操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:alice --consumer --topic test --group groupa

# 快捷设置允许某用户对某主题写操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:alice --producer --topic test

如果授权用户读写那么就要手动设置--operation参数,其它一些授权实例如下:

# 允许 alice 用户读取和写入test主题
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:alice --operation read --operation write --topic test

# 允许groupa组使用alice用户对主题test读写操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:alice --operation read --operation write --topic test --group groupa

# 允许所有用户对某个主题有读写操作
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:* --consumer --topic test

# 拒绝某个主机使用alice账号读写,允许其他所有的主机使用任何账号进行读写某个主题
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal user:* --allow-host * --deny-principal user:alice --deny-host 198.51.100.3 --consumer --topic test
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com