当前位置: 代码网 > it编程>编程语言>Java > 消息中间件:Kafka

消息中间件:Kafka

2024年07月31日 Java 我要评论
消息中间件:Kafka

(1)zookeeper安装

https://downloads.apache.org/zookeeper/

(1)更新系统的包管理器

sudo yum update

(2)安装jdk

sudo yum install java-1.8.0-openjdk-devel

(3)下载zookeeper

cd /usr/local/

wget https://downloads.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz

(4)解压zookeeper

tar -xvf apache-zookeeper-3.7.1-bin.tar.gz

(5)重命名为”zookeeper”

mv apache-zookeeper-3.7.1-bin zookeeper

(6)创建zookeeper数据目录

mkdir /usr/local/zookeeper/data

mkdir /usr/local/zookeeper/logs

(7)创建zookeeper配置文件:

zookeeper的滴答时间(以毫秒为单位)、zookeeper存储数据的数据目录、zookeeper监听的客户端端口

vim /usr/local/zookeeper/conf/zoo.cfg

ticktime=2000

datadir=/usr/local/zookeeper/data

datalogdir=/usr/local/zookeeper/logs

clientport=2181

(8)启动zookeeper

权限不足解决方案:su root、chmod a+xwr zkserver.sh

/usr/local/zookeeper/bin/zkserver.sh start

(9)使用如下命令检查zookeeper是否正在运行:

/usr/local/zookeeper/bin/zkserver.sh status

(2)kafka安装

https://kafka.apache.org/

权限不足解决方案:

chmod a+xwr kafka-topics.sh

chmod a+xwr kafka-console-producer.sh

chmod a+xwr kafka-console-consumer.sh

chmod a+xwr kafka-consumer-groups.sh

(1)安装kafka

cd /usr/local/

tar -zxvf /usr/local/kafka_2.11-2.4.0.tgz

mv kafka_2.11-2.4.0 kafka

(2)配置kafka

vim /usr/local/kafka/config/server.properties

broker.id=0

listeners=plaintext://192.168.19.131:9092

log.dirs=/usr/local/kafka/data/kafka-logs

zookeeper.connect=192.168.19.131:2181

(3)启动kafka

cd /usr/local/kafka/bin

./kafka-server-start.sh -daemon ../config/server.properties

ps -aux | grep server.properties

(4)使用kafka

创建主题

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 1 --topic test

查看主题

./kafka-topics.sh --list --zookeeper 192.168.19.131:2181

发送消息

./kafka-console-producer.sh --broker-list 192.168.19.131:9092 --topic test

接收消息方式一:从最后一条消息的偏移量+1开始消费

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --topic test

接收消息方式二:从头开始消费

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --from-beginning --topic test

(5)单播消息:一个消费组只有一个消费者能消费

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

(6)多播消息:不同的消费者处于不同的消费组

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group2 --topic test

(7)查看消费组

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --list

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group1

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group2

group          topic          partition  current-offset  log-end-offset  lag            consumer-id                                            host            client-id

group1          test            0          22              22              0              consumer-group1

current-offset:当前消费组已经消费的偏移量

log-end-offset:主题对应分区消息的结束偏移量(hw)

lag:当前消费组未消费的消息数

(8)主题分区

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 2 --topic topic1

./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic1

cd /usr/local/kafka/data/kafka-logs/topic1-0

cd /usr/local/kafka/data/kafka-logs/topic1-1

说明:定期将自己消费分区的offset提交给kafka内部topic、key是consumergroupid+topic+分区、value是当前offset值

说明:kafka会定期清理topic里的消息、默认保存7天、7天后消息会被删除

说明:通过此公式可以选出consumer消费的offset要提交到哪个分区:hash(consumergroupid)%__consumer_offsets主题分区数

__consumer_offsets-0

__consumer_offsets-49

(3)kafka集群

(1)kafka集群、3个broker

3个server.properties

vim server0.properties

broker.id=0

listeners=plaintext://192.168.19.131:9092

log.dirs=/usr/local/kafka/data/kafka-logs-0

vim server1.properties

broker.id=1

listeners=plaintext://192.168.19.131:9093

log.dirs=/usr/local/kafka/data/kafka-logs-1

vim server2.properties

broker.id=2

listeners=plaintext://192.168.19.131:9094

log.dirs=/usr/local/kafka/data/kafka-logs-2

./kafka-server-start.sh -daemon ../config/server0.properties

./kafka-server-start.sh -daemon ../config/server1.properties

./kafka-server-start.sh -daemon ../config/server2.properties

(2)副本:1个主题、2个分区、3个副本

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 3 --partitions 2 --topic topic2

./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic2

topic: topic2 partitioncount: 2 replicationfactor: 3 configs:

topic: topic2 partition: 0 leader: 2 replicas: 2,0,1 isr: 2,0,1

topic: topic2 partition: 1 leader: 0 replicas: 0,1,2 isr: 0,1,2

leader:

写和读的操作都在leader上、leader负责把数据同步到follower、当leader挂了、经过主从选举、从多个follower中选举产生一个新leader

follower:

接收leader的同步的数据

isr:

可以同步的broker节点和已同步的broker节点、存放在isr集合中、如果isr节点中的性能较差、会被踢出isr集合

总结:broker、主题、分区、副本

[root@web-server data]# ls

kafka-logs  kafka-logs-0  kafka-logs-1  kafka-logs-2

[root@web-server kafka-logs-1]# ls

cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint  topic2-0  topic2-1

[root@web-server kafka-logs-2]# ls

cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint  topic2-0  topic2-1

(3)kafka集群消息的发送

./kafka-console-producer.sh --broker-list 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --topic topic2

(4)kafka集群消息的发送

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group1 --from-beginning --topic topic2

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group2 --from-beginning --topic topic2

难点:一个partition只能被一个组中的一个consumer消费、一个consumer可以消费多个partition。

注意:kafka只在partition分区的范围内保证消息消费的局部顺序性、不能在同一个topic主题中的多个partition中保证总的消费顺序性。

(4)kafka-eagle监控

(1)安装jdk

yum install java-1.8.0-openjdk-devel

/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64

java -version

(2)解压

tar -zxvf kafka-eagle-bin-3.0.1.tar.gz

tar -zxvf efak-web-3.0.1-bin.tar.gz

mv efak-web-3.0.1 efak-web

mv efak-web ../

cd /usr/local/efak-web

(3)配置环境变量

vim /etc/profile

export java_home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64

export ke_home=/usr/local/efak-web

export path=$path:$java_home/bin:$ke_home/bin

source /etc/profile

(4)kafka-eagle内部配置问

vim /usr/local/efak-web/conf/system-config.properties

efak.zk.cluster.alias=cluster1

cluster1.zk.list=192.168.19.131:2181

efak.driver=com.mysql.cj.jdbc.driver

efak.url=jdbc:mysql://192.168.3.53:3306/ke?useunicode=true&characterencoding=utf-8&zerodatetimebehavior=converttonull

efak.username=root

efak.password=root

(5)启动

./ke.sh start

http://192.168.19.131:8048

admin/123456

windows版本启动问题解决方案:

例如:如果原来希望输入的命令为:

c:\program files\java\jdk-11.0.12\bin\java.exe -jar xxx.jar

1

现在应改为:

"c:\program files\java\jdk-11.0.12\bin\java.exe" -jar xxx.jar

(5)案例:zookeeper+kafka

一、安装docker

1、docker 要求 centos 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的centos 版本是否支持 docker 。

通过 uname -r 命令查看你当前的内核版本

$ uname -r

2、使用 root 权限登录 centos。确保 yum 包更新到最新。

$ sudo yum update

3、卸载旧版本(如果安装过旧版本的话)

$ sudo yum remove docker  docker-common docker-selinux docker-engine

4、安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的

$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2

5、设置yum源

$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

6、可以查看所有仓库中所有docker版本,并选择特定版本安装

$ yum list docker-ce --showduplicates | sort -r

7、安装docker

#$ sudo yum install docker-ce  #没有版本默认安装最新版本、由于repo中默认只开启stable仓库,故这里安装的是最新稳定版17.12.0

$ sudo yum install <fqpn>  # 例如:sudo yum install docker-ce-17.12.0.ce

8、启动并加入开机启动

$ sudo systemctl start docker

$ sudo systemctl enable docker

9、验证安装是否成功(有client和service两部分表示docker安装启动都成功了)

$ docker version

##############################################################################################

拉取zookeeker

docker pull wurstmeister/zookeeper

拉取kafka版本为2.12-2.2.0,不填写版本好则安装最新,但是个别系统会报错

docker pull wurstmeister/kafka:2.12-2.2.0

启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

启动kafka

docker run --name kafka01 \

-p 9092:9092 \

-e kafka_broker_id=0 \

-e kafka_zookeeper_connect=127.0.0.1:2181 \

-e kafka_advertised_listeners=plaintext://127.0.0.1:9092 \

-e kafka_listeners=plaintext://0.0.0.0:9092 \

-d  wurstmeister/kafka

进入kafka容器类kafka01是容器名称,也可以填写成容器id

docker exec -it kafka01 /bin/bash

创建my_log topic

/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.3.191:2181 --replication-factor 1 --partitions 1 --topic my_log

查询创建的主题

/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.3.191:2181

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com