文章目录
1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
2、创建生产者发送消息
[root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9095,192.168.748:9096,192.168.74.148:9097 --topic my_topic1
>1
>2
>3
>
[
[
{
"partition": 1,
"offset": 0,
"msg": "1",
"timespan": 1717592203289,
"date": "2024-06-05 12:56:43"
},
{
"partition": 1,
"offset": 1,
"msg": "2",
"timespan": 1717592204046,
"date": "2024-06-05 12:56:44"
},
{
"partition": 1,
"offset": 2,
"msg": "3",
"timespan": 1717592204473,
"date": "2024-06-05 12:56:44"
}
]
]
3、application.yml配置
server:
port: 8120
# v1
spring:
kafka:
bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
consumer:
# read-committed读事务已提交的消息 解决脏读问题
isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
enable-auto-commit: true
# 消费者提交ack时多长时间批量提交一次
auto-commit-interval: 1000
# 消费者第一次消费主题消息时从哪个位置开始
auto-offset-reset: earliest #指定offset消费:earliest | latest | none
key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
4、创建消费者监听器
package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;
@component
public class mykafkalistener {
@kafkalistener(topics ={"my_topic1"},groupid = "my_group1")
public void onmessage(consumerrecord<string, string> record) {
system.out.println("消费者获取到消息:topic = "+ record.topic()
+",partition:"+record.partition()
+",offset = "+record.offset()
+",key = "+record.key()
+",value = "+record.value());
}
}
5、创建springboot启动类
package com.atguigu.spring.kafka.consumer;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
// generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@springbootapplication
public class springkafkaconsumerapplication {
public static void main(string[] args) {
springapplication.run(springkafkaconsumerapplication.class, args);
}
}
6、屏蔽 kafka debug 日志 logback.xml
<configuration>
<!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
<logger name="org.apache.kafka.clients" level="debug" />
</configuration>
7、引入spring-kafka依赖
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>
<parent>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-parent</artifactid>
<version>3.0.5</version>
<relativepath/> <!-- lookup parent from repository -->
</parent>
<!-- generated by https://start.springboot.io -->
<!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
<groupid>com.atguigu</groupid>
<artifactid>spring-kafka-consumer</artifactid>
<version>0.0.1-snapshot</version>
<name>spring-kafka-consumer</name>
<description>spring-kafka-consumer</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-maven-plugin</artifactid>
</plugin>
</plugins>
</build>
</project>
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: spring boot :: (v3.0.5)
消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = null,value = 1
消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = null,value = 2
消费者获取到消息:topic = my_topic1,partition:1,offset = 2,key = null,value = 3
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: spring boot :: (v3.0.5)
发表评论