当前位置: 代码网 > it编程>编程语言>Java > 【Java】SpringBoot快速整合Kafka

【Java】SpringBoot快速整合Kafka

2024年07月31日 Java 我要评论
Kafka是一个开源的流式平台,用于构建实时数据流应用程序和实时数据管道。Kafka旨在处理大规模的数据流,具有高吞吐量、可扩展性、持久性和容错性的特点。

目录

1.什么是kafka?

主要特点和概念:

主要组成部分:

2.kafka可以用来做什么?

3.springboot整合kafka步骤:

1. 添加依赖:

2. 配置 kafka:

3. 创建 kafka 生产者:

4. 创建 kafka 消费者:

5. 发布消息:

6. 使用postman进行测试:


如果你没有kafka,可以参考这篇文章进行安装【docker】手把手教你使用docker搭建kafka【详细教程】_docker 安装kafka-csdn博客

1.什么是kafka?

        kafka是一个开源的流式平台,用于构建实时数据流应用程序和实时数据管道。kafka旨在处理大规模的数据流,具有高吞吐量、可扩展性、持久性和容错性的特点。

主要特点和概念:

  1. 发布-订阅模型: kafka采用发布-订阅模型,数据生产者将消息发布到一个或多个主题(topics),而数据消费者则订阅这些主题以接收消息。

  2. 分布式架构: kafka是一个分布式系统,允许横向扩展,通过分布式存储和分区机制来实现高吞吐量和可扩展性。

  3. 持久性存储: kafka使用持久性存储来保留消息,可以在消息发送后保留一定的时间,确保消费者可以在需要时检索历史消息。

  4. 数据分区: 主题被划分为多个分区,每个分区可以在不同的服务器上,以实现并行处理和提高性能。

  5. 流式处理: kafka提供了流处理功能,允许应用程序实时处理和分析数据流,执行复杂的事件处理操作。

  6. 高可用性: kafka在集群中的多个节点之间复制数据,提高了系统的容错性和可用性。

  7. 数据保证: kafka提供了不同级别的数据传递保证,包括至多一次、至少一次和精确一次语义。

  8. 生态系统: kafka生态系统丰富,包括连接器(connectors)、kafka streams、mirrormaker等组件,用于与各种外部系统集成和实现各种应用场景。

主要组成部分:

  • producer(生产者): 负责向kafka主题发布消息。

  • broker(代理): kafka集群中的服务器,负责存储和管理消息。

  • consumer(消费者): 订阅并处理kafka主题中的消息。

  • topic(主题): 消息的类别或标签,生产者将消息发布到主题,而消费者从主题订阅消息。

  • partition(分区): 主题可以划分为多个分区,每个分区独立存储和处理消息。

2.kafka可以用来做什么?

  1. 消息队列:

    场景: 在电子商务平台上,订单服务产生订单消息,并将其发布到kafka主题。支付服务、物流服务等通过订阅相应主题,异步处理订单信息,实现订单处理的解耦和异步通信。

  2. 实时数据流处理:

    场景: 在在线广告平台上,使用kafka streams处理实时产生的广告点击数据。可以实时计算点击率、过滤无效点击、将数据与用户信息连接,以实现实时广告效果分析。

  3. 日志收集与分析:

    场景: 在一个大规模的云服务中,使用kafka收集分布在不同服务器上的应用程序日志。日志分析服务通过消费kafka主题,实时分析日志以监控系统性能、检测异常和进行故障排除。

  4. 事件溯源(event sourcing):

    场景: 在金融领域的交易系统中,使用kafka追踪交易事件。每笔交易引发一个事件,将其发布到kafka主题,以便在需要时进行审计、回溯和重新处理。

  5. 数据同步:

    场景: 在企业的分布式系统中,使用kafka同步用户信息。用户服务在用户数据变更时将事件发布到kafka主题,其他服务通过消费主题以保持用户数据同步。

  6. 消息广播:

    场景: 在社交媒体应用中,使用kafka将用户发布的状态更新广播给其关注者。关注者通过订阅用户状态的kafka主题,实现实时消息广播。

  7. 分布式应用解耦:

    场景: 在电子商务微服务架构中,购物车服务、订单服务、支付服务等通过kafka进行异步通信。例如,购物车服务可以通过kafka发布购物车更新的事件,订单服务通过订阅事件来处理相关订单逻辑。

  8. 大数据集成:

    场景: 在一个大数据处理流水线中,使用kafka将产生的数据传输到spark进行实时分析。生产者将数据发布到kafka主题,而spark应用程序通过订阅主题来接收实时数据。

  9. 实时推荐系统:

    场景: 在在线视频平台上,使用kafka收集用户观看记录。推荐引擎通过消费kafka主题,实时更新用户的个性化推荐列表,提高用户体验。

  10. 异步通信:

    场景: 在电商平台中,使用kafka实现异步订单处理。当订单支付成功时,订单服务通过kafka发布订单处理完成的消息,而邮件服务通过订阅该主题来异步发送订单确认邮件。

下面就使用springboot整合kafka的发布订阅机制,实现消息的发布和订阅。

3.springboot整合kafka步骤:

1. 添加依赖:

确保在你的pom.xml文件中包含了spring boot和spring kafka的依赖。

<dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

        <!-- spring kafka starter -->
        <dependency>
            <groupid>org.springframework.kafka</groupid>
            <artifactid>spring-kafka</artifactid>
        </dependency>
</dependencies>

2. 配置 kafka:

在application.properties或application.yml中配置 kafka 连接信息。

spring:
  kafka:
    bootstrap-servers: your-kafka-server:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.stringserializer
      value-serializer: org.apache.kafka.common.serialization.stringserializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
      value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.stringserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.stringserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.stringdeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.stringdeserializer

3. 创建 kafka 生产者:

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.service;

@service
public class kafkamessageproducer {

    private static final string topic = "admin-messages";

    @autowired
    private kafkatemplate<string, string> kafkatemplate;

    public void sendadminmessage(string message) {
        kafkatemplate.send(topic, message);
    }
}

4. 创建 kafka 消费者:

import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.service;

@service
public class kafkamessageconsumer {

    @kafkalistener(topics = "admin-messages")
    public void receiveadminmessage(string message) {
        system.out.println("received message: " + message);
        // ...
    }
}

5. 发布消息:

在管理员需要发布消息的地方调用kafkamessageproducer的 sendadminmessage 方法。

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.*;

@restcontroller
@requestmapping("/message")
public class admincontroller {

    @autowired
    private kafkamessageproducer kafkamessageproducer;

    @getmapping("/publish")
    public void publishadminmessage(@requestparam("message") string message) {
        kafkamessageproducer.sendadminmessage(message);
    }
}

        当调用 publishadminmessage方法时,所有监听 admin-messages 主题的用户将会接收到相应的消息。

6. 使用postman进行测试:

控制台输出结果:

这样就使用springboot整合了kafka并写了一个简单的案例。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com