当前位置: 代码网 > it编程>编程语言>Java > 吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你!

吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你!

2024年08月02日 Java 我要评论
Kafka是当下非常流行的消息中间件,据官网透露,已有成千上万的公司在使用它。最近实践了一波Kafka,确实很好很强大。今天我们来从三个方面学习下Kafka:Kafaka在Linux下的安装,Kafka的可视化工具,Kafka和SpringBoot结合使用。希望大家看完后能快速入门Kafka,掌握这个流行的消息中间件!Kafka是由LinkedIn公司开发的一款开源分布式消息流平台,由Scala和Java编写。主要作用是为处理实时数据提供一个统一、高吞吐、低延迟的平台,其本质是基于发布订阅模式。

摘要

kafka是当下非常流行的消息中间件,据官网透露,已有成千上万的公司在使用它。最近实践了一波kafka,确实很好很强大。今天我们来从三个方面学习下kafka:kafaka在linux下的安装,kafka的可视化工具,kafka和springboot结合使用。希望大家看完后能快速入门kafka,掌握这个流行的消息中间件!

kafka简介

kafka是由linkedin公司开发的一款开源分布式消息流平台,由scala和java编写。主要作用是为处理实时数据提供一个统一、高吞吐、低延迟的平台,其本质是基于发布订阅模式的消息引擎系统。

kafka具有以下特性:

  • 高吞吐、低延迟:kafka收发消息非常快,使用集群处理消息延迟可低至2ms。
  • 高扩展性:kafka可以弹性地扩展和收缩,可以扩展到上千个broker,数十万个partition,每天处理数万亿条消息。
  • 永久存储:kafka可以将数据安全地存储在分布式的,持久的,容错的群集中。
  • 高可用性:kafka在可用区上可以有效地扩展群集,某个节点宕机,集群照样能够正常工作。

kafka安装

img

  • 下载完成后将kafka解压到指定目录:
bash复制代码cd /mydata/kafka/
tar -xzf kafka_2.13-2.8.0.tgz
  • 解压完成后进入到解压目录:
bash
复制代码cd kafka_2.13-2.8.0
  • 虽然有消息称kafka即将移除zookeeper,但是在kafka最新版本中尚未移除,所以启动kafka前还是需要先启动zookeeper;

img

  • 启动zookeeper服务,服务将运行在2181端口;
bash复制代码# 后台运行服务,并把日志输出到当前文件夹下的zookeeper-out.file文件中
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-out.file 2>&1 &
  • 由于目前kafka是部署在linux服务器上的,外网如果想要访问,需要修改kafka的配置文件config/server.properties,修改下kafka的监听地址,否则会无法连接;
properties复制代码############################# socket server settings #############################

# the address the socket server listens on. it will get the value returned from
# java.net.inetaddress.getcanonicalhostname() if not configured.
#   format:
#     listeners = listener_name://host_name:port
#   example:
#     listeners = plaintext://your.host.name:9092
listeners=plaintext://192.168.5.78:9092
  • 最后启动kafka服务,服务将运行在9092端口。
bash复制代码# 后台运行服务,并把日志输出到当前文件夹下的kafka-out.file文件中
nohup bin/kafka-server-start.sh config/server.properties > kafka-out.file 2>&1 &

kafka命令行操作

  • 首先创建一个叫consoletopic的topic;
bash
复制代码bin/kafka-topics.sh --create --topic consoletopic --bootstrap-server 192.168.5.78:9092
  • 接下来查看topic;
bash
复制代码bin/kafka-topics.sh --describe --topic consoletopic --bootstrap-server 192.168.5.78:9092
  • 会显示如下topic信息;
bash复制代码topic: consoletopic	topicid: tjmxuq8qrjglhcsf2ojugw	partitioncount: 1	replicationfactor: 1	configs: segment.bytes=1073741824
	topic: consoletopic	partition: 0	leader: 0	replicas: 0	isr: 0
  • 向topic中发送消息:
bash
复制代码bin/kafka-console-producer.sh --topic consoletopic --bootstrap-server 192.168.5.78:9092
  • 直接在命令行中输入信息即可发送;

img

  • 重新打开一个窗口,通过如下命令可以从topic中获取消息:
bash
复制代码bin/kafka-console-consumer.sh --topic consoletopic --from-beginning --bootstrap-server 192.168.5.78:9092

img

kafka可视化

安装jdk

img

  • 下载完成后将jdk解压到指定目录;
bash复制代码cd /mydata/java
tar -zxvf openjdk8u-jdk_x64_linux_xxx.tar.gz
mv openjdk8u-jdk_x64_linux_xxx.tar.gz jdk1.8
  • /etc/profile文件中添加环境变量java_home
bash复制代码vi /etc/profile
# 在profile文件中添加
export java_home=/mydata/java/jdk1.8
export path=$path:$java_home/bin
# 使修改后的profile文件生效
. /etc/profile

安装kafka-eagle

img

  • 下载完成后将kafka-eagle解压到指定目录;
bash复制代码cd /mydata/kafka/
tar -zxvf kafka-eagle-web-2.0.5-bin.tar.gz
  • /etc/profile文件中添加环境变量ke_home
bash复制代码vi /etc/profile
# 在profile文件中添加
export ke_home=/mydata/kafka/kafka-eagle-web-2.0.5
export path=$path:$ke_home/bin
# 使修改后的profile文件生效
. /etc/profile
  • 安装mysql并添加数据库kekafka-eagle之后会用到它;
  • 修改配置文件$ke_home/conf/system-config.properties,主要是修改zookeeper的配置和数据库配置,注释掉sqlite配置,改为使用mysql;
properties复制代码######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka sqlite jdbc driver address
######################################
# kafka.eagle.driver=org.sqlite.jdbc
# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# kafka.eagle.username=root
# kafka.eagle.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.driver
kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useunicode=true&characterencoding=utf-8&zerodatetimebehavior=converttonull
kafka.eagle.username=root
kafka.eagle.password=root
  • 使用如下命令启动kafka-eagle
bash
复制代码$ke_home/bin/ke.sh start
  • 命令执行完成后会显示如下信息,但并不代表服务已经启动成功,还需要等待一会;

img

  • 再介绍几个有用的kafka-eagle命令:
bash复制代码# 停止服务
$ke_home/bin/ke.sh stop
# 重启服务
$ke_home/bin/ke.sh restart
# 查看服务运行状态
$ke_home/bin/ke.sh status
# 查看服务状态
$ke_home/bin/ke.sh stats
# 动态查看服务输出日志
tail -f $ke_home/logs/ke_console.out

img

  • 登录成功后可以访问到dashboard,界面还是很棒的!

img

可视化工具使用

  • 之前我们使用命令行创建了topic,这里可以直接通过界面来创建;

img

  • 我们还可以直接通过kafka-eagle来发送消息;

img

  • 我们可以通过命令行来消费topic中的消息;
bash
复制代码bin/kafka-console-consumer.sh --topic testtopic --from-beginning --bootstrap-server 192.168.5.78:9092
  • 控制台获取到信息显示如下;

img

  • 还有一个很有意思的功能叫ksql,可以通过sql语句来查询topic中的消息;

img

  • 可视化工具自然少不了监控,如果你想开启kafka-eagle对kafka的监控功能的话,需要修改kafka的启动脚本,暴露jmx的端口;
bash复制代码vi kafka-server-start.sh
# 暴露jmx端口
if [ "x$kafka_heap_opts" = "x" ]; then
    export kafka_heap_opts="-server -xms2g -xmx2g -xx:permsize=128m -xx:+useg1gc -xx:maxgcpausemillis=200 -xx:parallelgcthreads=8 -xx:concgcthreads=5 -xx:initiatingheapoccupancypercent=70"
    export jmx_port="9999"
fi
  • 来看下监控图表界面;

img

  • 还有一个很骚气的监控大屏功能;

img

  • 还有zookeeper的命令行功能,总之功能很全,很强大!

img

springboot整合kafka

  • 首先在应用的pom.xml中添加spring kafka依赖;
xml复制代码<!--spring整合kafka-->
<dependency>
    <groupid>org.springframework.kafka</groupid>
    <artifactid>spring-kafka</artifactid>
    <version>2.7.1</version>
</dependency>
  • 修改应用配置文件application.yml,配置kafka服务地址及consumer的group-id
yaml复制代码server:
  port: 8088
spring:
  kafka:
    bootstrap-servers: '192.168.5.78:9092'
    consumer:
      group-id: "bootgroup"
  • 创建一个生产者,用于向kafka的topic中发送消息;
java复制代码/**
 * kafka消息生产者
 * created by macro on 2021/5/19.
 */
@component
public class kafkaproducer {
    @autowired
    private kafkatemplate kafkatemplate;

    public void send(string message){
        kafkatemplate.send("boottopic",message);
    }
}
  • 创建一个消费者,用于从kafka中获取消息并消费;
java复制代码/**
 * kafka消息消费者
 * created by macro on 2021/5/19.
 */
@slf4j
@component
public class kafkaconsumer {

    @kafkalistener(topics = "boottopic")
    public void processmessage(string content) {
        log.info("consumer processmessage : {}",content);
    }

}
  • 创建一个发送消息的接口,调用生产者去发送消息;
java复制代码/**
 * kafka功能测试
 * created by macro on 2021/5/19.
 */
@api(tags = "kafkacontroller", description = "kafka功能测试")
@controller
@requestmapping("/kafka")
public class kafkacontroller {

    @autowired
    private kafkaproducer kafkaproducer;

    @apioperation("发送消息")
    @requestmapping(value = "/sendmessage", method = requestmethod.get)
    @responsebody
    public commonresult sendmessage(@requestparam string message) {
        kafkaproducer.send(message);
        return commonresult.success(null);
    }
}
  • 直接在swagger中调用接口进行测试;

img

  • 项目控制台会输出如下信息,表明消息已经被接收并消费掉了。
bash
复制代码2021-05-19 16:59:21.016  info 2344 --- [ntainer#0-0-c-1] c.m.mall.tiny.component.kafkaconsumer    : consumer processmessage : spring boot message!

总结

通过本文的一波实践,大家基本就能入门kafka了。安装、可视化工具、结合springboot,这些基本都是和开发者相关的操作,也是学习kafka的必经之路。

参考资料

项目源码地址

github.com/macrozheng/…

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com