当前位置: 代码网 > it编程>编程语言>Java > kafka-消费者服务搭建&配置&简单消费(SpringBoot整合Kafka)

kafka-消费者服务搭建&配置&简单消费(SpringBoot整合Kafka)

2024年07月31日 Java 我要评论
kafka-消费者服务搭建&配置(SpringBoot整合Kafka)

1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

2、创建生产者发送消息

[root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9095,192.168.748:9096,192.168.74.148:9097 --topic my_topic1
>1
>2
>3
>

在这里插入图片描述

[
  [
    {
      "partition": 1,
      "offset": 0,
      "msg": "1",
      "timespan": 1717592203289,
      "date": "2024-06-05 12:56:43"
    },
    {
      "partition": 1,
      "offset": 1,
      "msg": "2",
      "timespan": 1717592204046,
      "date": "2024-06-05 12:56:44"
    },
    {
      "partition": 1,
      "offset": 2,
      "msg": "3",
      "timespan": 1717592204473,
      "date": "2024-06-05 12:56:44"
    }
  ]
]

3、application.yml配置

server:
  port: 8120

# v1
spring:
  kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      enable-auto-commit: true 
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      auto-offset-reset: earliest  #指定offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
      value-deserializer: org.apache.kafka.common.serialization.stringdeserializer

4、创建消费者监听器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;
@component
public class mykafkalistener {

    @kafkalistener(topics ={"my_topic1"},groupid = "my_group1")
    public void onmessage(consumerrecord<string, string> record) {
        system.out.println("消费者获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
    }

}

5、创建springboot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;


// generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@springbootapplication
public class springkafkaconsumerapplication {

    public static void main(string[] args) {
        springapplication.run(springkafkaconsumerapplication.class, args);
    }

}

6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

7、引入spring-kafka依赖

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>3.0.5</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>

    <!-- generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupid>com.atguigu</groupid>
    <artifactid>spring-kafka-consumer</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.kafka</groupid>
            <artifactid>spring-kafka</artifactid>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: spring boot ::                (v3.0.5)

消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = null,value = 1
消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = null,value = 2
消费者获取到消息:topic = my_topic1,partition:1,offset = 2,key = null,value = 3
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: spring boot ::                (v3.0.5)

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com