1.什么是spring cloud stream?
我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套api,无需关心具体的消息队列实现”。 这样理解是有些不全面的,spring cloud stream的核心是stream,准确来讲spring cloud stream提供了一整套数据流走向(流向)的api, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。
我们很容易总结出每个模块的流程:
- 从上一个模块拉取数据
- 处理数据
- 将处理完成的数据发给下一个模块
其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitmq。很明显,它们的功能都是一样的:**提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。**但由于中间件的不同,需要使用不同的api。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了spring cloud stream。
2.环境准备
采用docker-compose搭建kafaka环境
version: '3' networks: kafka: ipam: driver: default config: - subnet: "172.22.6.0/24" services: zookepper: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest container_name: zookeeper-server restart: unless-stopped volumes: - "/etc/localtime:/etc/localtime" environment: allow_anonymous_login: yes ports: - "2181:2181" networks: kafka: ipv4_address: 172.22.6.11 kafka: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1 container_name: kafka restart: unless-stopped volumes: - "/etc/localtime:/etc/localtime" environment: allow_plaintext_listener: yes kafka_cfg_zookeeper_connect: zookepper:2181 kafka_cfg_advertised_listeners: plaintext://10.11.68.77:9092 ports: - "9092:9092" depends_on: - zookepper networks: kafka: ipv4_address: 172.22.6.12 kafka-map: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map container_name: kafka-map restart: unless-stopped volumes: - "./kafka/kafka-map/data:/usr/local/kafka-map/data" environment: default_username: admin default_password: 123456 ports: - "9080:8080" depends_on: - kafka networks: kafka: ipv4_address: 172.22.6.13
run
docker-compose -f docker-compose-kafka.yml -p kafka up -d
kafka-map
https://github.com/dushixiang/kafka-map
- 访问:http://127.0.0.1:9080
- 账号密码:admin/123456
3.代码工程
实验目标
- 生成uuid并将其发送到kafka主题
batch-in
。 - 从
batch-in
主题接收uuid的批量消息,移除其中的数字,并将结果发送到batch-out
主题。 - 监听
batch-out
主题并打印接收到的消息。
pom.xml
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactid>springcloud-demo</artifactid> <groupid>com.et</groupid> <version>1.0-snapshot</version> </parent> <modelversion>4.0.0</modelversion> <artifactid>spring-cloud-stream-kafaka</artifactid> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <!-- spring boot starter web --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <!-- spring boot starter test --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-stream-kafka</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> </dependency> </dependencies> </project>
处理流
/* * copyright 2023 the original author or authors. * * licensed under the apache license, version 2.0 (the "license"); * you may not use this file except in compliance with the license. * you may obtain a copy of the license at * * https://www.apache.org/licenses/license-2.0 * * unless required by applicable law or agreed to in writing, software * distributed under the license is distributed on an "as is" basis, * without warranties or conditions of any kind, either express or implied. * see the license for the specific language governing permissions and * limitations under the license. */ package com.et; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; import org.springframework.kafka.annotation.kafkalistener; import org.springframework.messaging.message; import org.springframework.messaging.support.messagebuilder; import java.util.list; import java.util.uuid; import java.util.function.function; import java.util.function.supplier; /** * @author steven gantz */ @springbootapplication public class cloudstreamsfunctionbatch { public static void main(string[] args) { springapplication.run(cloudstreamsfunctionbatch.class, args); } @bean public supplier<uuid> stringsupplier() { return () -> { var uuid = uuid.randomuuid(); system.out.println(uuid + " -> batch-in"); return uuid; }; } @bean public function<list<uuid>, list<message<string>>> digitremovingconsumer() { return idbatch -> { system.out.println("removed digits from batch of " + idbatch.size()); return idbatch.stream() .map(uuid::tostring) // remove all digits from the uuid .map(uuid -> uuid.replaceall("\\d","")) .map(nodigitstring -> messagebuilder.withpayload(nodigitstring).build()) .tolist(); }; } @kafkalistener(id = "batch-out", topics = "batch-out") public void listen(string in) { system.out.println("batch-out -> " + in); } }
定义一个名为
stringsupplier
的bean,它实现了supplier<uuid>
接口。这个方法生成一个随机的uuid,并打印到控制台,表示这个uuid将被发送到batch-in
主题。定义一个名为
digitremovingconsumer
的bean,它实现了function<list<uuid>, list<message<string>>>
接口。这个方法接受一个uuid的列表,打印出处理的uuid数量,然后将每个uuid转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。使用
@kafkalistener
注解定义一个kafka监听器,监听batch-out
主题。当接收到消息时,调用listen
方法并打印接收到的消息内容。
配置文件
spring: cloud: function: definition: stringsupplier;digitremovingconsumer stream: bindings: stringsupplier-out-0: destination: batch-in digitremovingconsumer-in-0: destination: batch-in group: batch-in consumer: batch-mode: true digitremovingconsumer-out-0: destination: batch-out kafka: binder: brokers: localhost:9092 bindings: digitremovingconsumer-in-0: consumer: configuration: # forces consumer to wait 5 seconds before polling for messages fetch.max.wait.ms: 5000 fetch.min.bytes: 1000000000 max.poll.records: 10000000
参数解释
spring: cloud: function: definition: stringsupplier;digitremovingconsumer
spring.cloud.function.definition
:定义了两个函数,stringsupplier
和digitremovingconsumer
。这两个函数将在应用程序中被使用。
stream: bindings: stringsupplier-out-0: destination: batch-in
stream.bindings.stringsupplier-out-0.destination
:将stringsupplier
函数的输出绑定到kafka主题batch-in
。
digitremovingconsumer-in-0: destination: batch-in group: batch-in consumer: batch-mode: true
stream.bindings.digitremovingconsumer-in-0.destination
:将digitremovingconsumer
函数的输入绑定到kafka主题batch-in
。group: batch-in
:指定消费者组为batch-in
,这意味着多个实例可以共享这个组来处理消息。consumer.batch-mode: true
:启用批处理模式,允许消费者一次处理多条消息。
digitremovingconsumer-out-0: destination: batch-out
stream.bindings.digitremovingconsumer-out-0.destination
:将digitremovingconsumer
函数的输出绑定到kafka主题batch-out
。
以上只是一些关键代码
4.测试
启动弄spring boot应用,可以看到控制台输出日志如下:
291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in removed digits from batch of 5 batch-out -> eacc-ee-dfb-b-dead batch-out -> cbae-e-f-c-acfb batch-out -> ab-dd---adade batch-out -> db-fb-f-bbb-bfdec batch-out -> bdb--d-ad-bbbb
以上就是spring cloud stream实现数据流处理的详细内容,更多关于spring cloud stream数据流处理的资料请关注代码网其它相关文章!
发表评论