当前位置: 代码网 > it编程>编程语言>Java > Spring Cloud Stream实现数据流处理

Spring Cloud Stream实现数据流处理

2024年11月26日 Java 我要评论
1.什么是spring cloud stream?我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套api,无需关心具体的消息队列实现”。 这

1.什么是spring cloud stream?

我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套api,无需关心具体的消息队列实现”。 这样理解是有些不全面的,spring cloud stream的核心是stream,准确来讲spring cloud stream提供了一整套数据流走向(流向)的api, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

我们很容易总结出每个模块的流程:

  • 从上一个模块拉取数据
  • 处理数据
  • 将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitmq。很明显,它们的功能都是一样的:**提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。**但由于中间件的不同,需要使用不同的api。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了spring cloud stream。

2.环境准备

采用docker-compose搭建kafaka环境

version: '3'

networks:
  kafka:
    ipam:
      driver: default
      config:
        - subnet: "172.22.6.0/24"

services:
  zookepper:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
    container_name: zookeeper-server
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      allow_anonymous_login: yes
    ports:
      - "2181:2181"
    networks:
      kafka:
        ipv4_address: 172.22.6.11

  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
    container_name: kafka
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      allow_plaintext_listener: yes
      kafka_cfg_zookeeper_connect: zookepper:2181
      kafka_cfg_advertised_listeners: plaintext://10.11.68.77:9092
    ports:
      - "9092:9092"
    depends_on:
      - zookepper
    networks:
      kafka:
        ipv4_address: 172.22.6.12

  kafka-map:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
    container_name: kafka-map
    restart: unless-stopped
    volumes:
      - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
    environment:
      default_username: admin
      default_password: 123456
    ports:
      - "9080:8080"
    depends_on:                         
      - kafka
    networks:
      kafka:
        ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 访问:http://127.0.0.1:9080
  • 账号密码:admin/123456

3.代码工程

实验目标

  • 生成uuid并将其发送到kafka主题batch-in
  • batch-in主题接收uuid的批量消息,移除其中的数字,并将结果发送到batch-out主题。
  • 监听batch-out主题并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactid>springcloud-demo</artifactid>
        <groupid>com.et</groupid>
        <version>1.0-snapshot</version>
    </parent>
    <modelversion>4.0.0</modelversion>

    <artifactid>spring-cloud-stream-kafaka</artifactid>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- spring boot starter web -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <!-- spring boot starter test -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-kafka</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter</artifactid>
        </dependency>
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
        </dependency>

    </dependencies>

</project>

处理流

/*
 * copyright 2023 the original author or authors.
 *
 * licensed under the apache license, version 2.0 (the "license");
 * you may not use this file except in compliance with the license.
 * you may obtain a copy of the license at
 *
 *      https://www.apache.org/licenses/license-2.0
 *
 * unless required by applicable law or agreed to in writing, software
 * distributed under the license is distributed on an "as is" basis,
 * without warranties or conditions of any kind, either express or implied.
 * see the license for the specific language governing permissions and
 * limitations under the license.
 */

package com.et;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.annotation.bean;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;

import java.util.list;
import java.util.uuid;
import java.util.function.function;
import java.util.function.supplier;

/**
 * @author steven gantz
 */
@springbootapplication
public class cloudstreamsfunctionbatch {

   public static void main(string[] args) {
      springapplication.run(cloudstreamsfunctionbatch.class, args);
   }

   @bean
   public supplier<uuid> stringsupplier() {
      return () -> {
         var uuid = uuid.randomuuid();
         system.out.println(uuid + " -> batch-in");
         return uuid;
      };
   }

   @bean
   public function<list<uuid>, list<message<string>>> digitremovingconsumer() {
      return idbatch -> {
         system.out.println("removed digits from batch of " + idbatch.size());
         return idbatch.stream()
            .map(uuid::tostring)
            // remove all digits from the uuid
            .map(uuid -> uuid.replaceall("\\d",""))
            .map(nodigitstring -> messagebuilder.withpayload(nodigitstring).build())
            .tolist();
      };
   }

   @kafkalistener(id = "batch-out", topics = "batch-out")
   public void listen(string in) {
      system.out.println("batch-out -> " + in);
   }

}
  • 定义一个名为stringsupplier的bean,它实现了supplier<uuid>接口。这个方法生成一个随机的uuid,并打印到控制台,表示这个uuid将被发送到batch-in主题。

  • 定义一个名为digitremovingconsumer的bean,它实现了function<list<uuid>, list<message<string>>>接口。这个方法接受一个uuid的列表,打印出处理的uuid数量,然后将每个uuid转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。

  • 使用@kafkalistener注解定义一个kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

配置文件

spring:
  cloud:
    function:
      definition: stringsupplier;digitremovingconsumer
    stream:
      bindings:
        stringsupplier-out-0:
          destination: batch-in
        digitremovingconsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
        digitremovingconsumer-out-0:
          destination: batch-out
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          digitremovingconsumer-in-0:
            consumer:
              configuration:
                # forces consumer to wait 5 seconds before polling for messages
                fetch.max.wait.ms: 5000
                fetch.min.bytes: 1000000000
                max.poll.records: 10000000

参数解释

spring:
  cloud:
    function:
      definition: stringsupplier;digitremovingconsumer

spring.cloud.function.definition:定义了两个函数,stringsupplierdigitremovingconsumer。这两个函数将在应用程序中被使用。

stream:
  bindings:
    stringsupplier-out-0:
      destination: batch-in

stream.bindings.stringsupplier-out-0.destination:将stringsupplier函数的输出绑定到kafka主题batch-in

    digitremovingconsumer-in-0:
      destination: batch-in
      group: batch-in
      consumer:
        batch-mode: true
  • stream.bindings.digitremovingconsumer-in-0.destination:将digitremovingconsumer函数的输入绑定到kafka主题batch-in

  • group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。

  • consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。

    digitremovingconsumer-out-0:
      destination: batch-out
  • stream.bindings.digitremovingconsumer-out-0.destination:将digitremovingconsumer函数的输出绑定到kafka主题batch-out

以上只是一些关键代码

4.测试

启动弄spring boot应用,可以看到控制台输出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

以上就是spring cloud stream实现数据流处理的详细内容,更多关于spring cloud stream数据流处理的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com