简单概述
spring cloud stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。
应用程序通过inputs或outputs来与spring cloud stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:spring cloud stream能够屏蔽底层消息中间件【rabbitmq,kafka等】的差异,降低切换成本,统一消息的编程模型。
相关概念
channel(通道):channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到destination,消费者通过输入通道从destination接收消息。
在spring cloud stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。
- input(输入):input通道用于消费者从消息代理接收消息。消费者可以通过监听input通道来实时接收传入的消息
- output(输出):output通道用于生产者向消息代理发送消息。生产者可以通过向output通道发送消息来发布新的消息
destination(目标):destination是消息的目的地,通常对应于消息代理中的topic或queue。生产者将消息发送到特定的destination,消费者从其中接收消息。
binder(绑定器):binder是spring cloud stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。spring cloud stream提供了多个可用的binder实现,包括rabbitmq、kafka等。
**消费者组:**在spring cloud stream中,消费组(consumer group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)
分区:spring cloud stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理
spring message
spring message是spring framework的一个模块,其作用就是统一消息的编程模型。
package org.springframework.messaging;
public interface message<t> {
t getpayload();
messageheaders getheaders();
}消息通道 messagechannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:
@functionalinterface
public interface messagechannel {
long indefinite_timeout = -1;
default boolean send(message<?> message) {
return send(message, indefinite_timeout);
}
boolean send(message<?> message, long timeout);
}消息通道里的消息由消息通道的子接口可订阅的消息通道subscribablechannel实现,被messagehandler消息处理器所订阅
public interface subscribablechannel extends messagechannel {
boolean subscribe(messagehandler handler);
boolean unsubscribe(messagehandler handler);
}由messagehandler真正地消费/处理消息
@functionalinterface
public interface messagehandler {
void handlemessage(message<?> message) throws messagingexception;
}spring integration
spring integration 提供了 spring 编程模型的扩展用来支持企业集成模式(enterprise integration patterns),是对 spring messaging 的扩展。
它提出了不少新的概念,包括消息路由messageroute、消息分发messagedispatcher、消息过滤filter、消息转换transformer、消息聚合aggregator、消息分割splitter等等。同时还提供了messagechannel和messagehandler的实现,分别包括 directchannel、executorchannel、publishsubscribechannel和messagefilter、serviceactivatinghandler、methodinvokingsplitter 等内容。
spring-cloud-stream的架构

快速入门
引入依赖
<!--stream-->
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-starter-stream-rabbit</artifactid>
</dependency>增加配置文件
spring:
cloud:
stream:
# 定义消息中间件
binders:
myrabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: root
password: root
vhost: /
bindings:
# 生产者中定义,定义发布对象
myinput:
destination: mystreamexchange
group: mystreamgroup
binder: myrabbit
# 消费者中定义,定义订阅的对象
myoutput-in-0:
destination: mystreamexchange
group: mystreamgroup
binder: myrabbit
# 消费者中定义,定义输出的函数
function:
definition: myoutput生产者
@resource
private streambridge streambridge;
public void sendnormal() {
streambridge.send("myinput", "hello world");
}消费者
@bean("myoutput")
public consumer<message<string>> myoutput() {
return (message) -> {
messageheaders headers = message.getheaders();
system.out.println("myoutput head is : " + headers);
string payload = message.getpayload();
system.out.println("myoutput payload is : " + payload);
};
}如何自定义binder
- 添加spring-cloud-stream依赖
- 提供
provisioningprovider的实现提供 messageproducer的实现提供messagehandler的实现提供binder的实现创建binder的配置- 在
meta-inf/spring.binders中定义绑定器
添加spring-cloud-stream依赖
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-stream</artifactid>
<version>${spring.cloud.stream.version}</version>
</dependency>提供provisioningprovider的实现
provisioningprovider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。
public class fileprovisioningprovider implements provisioningprovider<
extendedconsumerproperties<fileconsumerproperties>, extendedproducerproperties<fileproducerproperties>> {
public fileprovisioningprovider() {
super();
}
@override
public producerdestination provisionproducerdestination(string name, extendedproducerproperties<fileproducerproperties> properties) throws provisioningexception {
return new filemessagedestination(name);
}
@override
public consumerdestination provisionconsumerdestination(string name, string group, extendedconsumerproperties<fileconsumerproperties> properties) throws provisioningexception {
return new filemessagedestination(name);
}
private static class filemessagedestination implements producerdestination, consumerdestination {
private final string destination;
private filemessagedestination(final string destination) {
this.destination = destination;
}
@override
public string getname() {
return destination.trim();
}
@override
public string getnameforpartition(int partition) {
throw new unsupportedoperationexception("partitioning is not implemented for file messaging.");
}
}
}提供messageproducer的实现
messageproducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。
super.oninit();
executorservice = executors.newscheduledthreadpool(1);
}
@override
public void dostart() {
executorservice.schedulewithfixeddelay(() -> {
string payload = getpayload();
if (payload != null) {
message<string> receivedmessage = messagebuilder.withpayload(payload).build();
sendmessage(receivedmessage);
}
}, 0, 50, timeunit.milliseconds);
}
@override
protected void dostop() {
executorservice.shutdownnow();
}
private string getpayload() {
try {
list<string> alllines = files.readalllines(paths.get(fileextendedbindingproperties.getpath() + file.separator + destination.getname() + ".txt"));
string currentpayload = alllines.get(alllines.size() - 1);
if (!currentpayload.equals(previouspayload)) {
previouspayload = currentpayload;
return currentpayload;
}
} catch (ioexception e) {
fileutil.touch(new file(fileextendedbindingproperties.getpath() + file.separator + destination.getname() + ".txt"));
}
return null;
}
}提供messagehandler的实现
messagehandler提供产生事件所需的逻辑。
public class filemessagehandler extends abstractmessagehandler {
fileextendedbindingproperties fileextendedbindingproperties;
producerdestination destination;
public filemessagehandler(producerdestination destination, fileextendedbindingproperties fileextendedbindingproperties) {
this.destination = destination;
this.fileextendedbindingproperties = fileextendedbindingproperties;
}
@override
protected void handlemessageinternal(message<?> message) {
try {
if (message.getpayload() instanceof byte[]) {
files.write(paths.get(fileextendedbindingproperties.getpath() + file.separator + destination.getname() + ".txt"), (byte[]) message.getpayload());
} else {
throw new runtimeexception("处理消息失败");
}
} catch (ioexception e) {
throw new runtimeexception(e);
}
}
}提供binder的实现
提供自己的binder抽象实现:
- 扩展
abstractmessagechannelbinder类 - 将自定义的 provisioningprovider 指定为 abstractmessagechannelbinder 的通用参数
- 重写
createproducermessagehandler和createconsumerendpoint方法
public class filemessagechannelbinder extends abstractmessagechannelbinder
<extendedconsumerproperties<fileconsumerproperties>, extendedproducerproperties<fileproducerproperties>, fileprovisioningprovider>
implements extendedpropertiesbinder<messagechannel, fileconsumerproperties, fileproducerproperties> {
fileextendedbindingproperties fileextendedbindingproperties;
public filemessagechannelbinder(string[] headerstoembed, fileprovisioningprovider provisioningprovider, fileextendedbindingproperties fileextendedbindingproperties) {
super(headerstoembed, provisioningprovider);
this.fileextendedbindingproperties = fileextendedbindingproperties;
}
@override
protected messagehandler createproducermessagehandler(producerdestination destination, extendedproducerproperties<fileproducerproperties> producerproperties, messagechannel errorchannel) throws exception {
filemessagehandler filemessagehandler = new filemessagehandler(destination, fileextendedbindingproperties);
return filemessagehandler;
}
@override
protected messageproducer createconsumerendpoint(consumerdestination destination, string group, extendedconsumerproperties<fileconsumerproperties> properties) throws exception {
filemessageproduceradapter filemessageproduceradapter = new filemessageproduceradapter(destination, fileextendedbindingproperties);
return filemessageproduceradapter;
}
@override
public fileconsumerproperties getextendedconsumerproperties(string channelname) {
return fileextendedbindingproperties.getextendedconsumerproperties(channelname);
}
@override
public fileproducerproperties getextendedproducerproperties(string channelname) {
return fileextendedbindingproperties.getextendedproducerproperties(channelname);
}
@override
public string getdefaultsprefix() {
return fileextendedbindingproperties.getdefaultsprefix();
}
@override
public class<? extends binderspecificpropertiesprovider> getextendedpropertiesentryclass() {
return fileextendedbindingproperties.getextendedpropertiesentryclass();
}
}创建binder的配置
严格要求创建一个 spring 配置来初始化你的绑定器实现的 bean
@enableconfigurationproperties(fileextendedbindingproperties.class)
@configuration
public class filemessagebinderconfiguration {
@bean
@conditionalonmissingbean
public fileprovisioningprovider filemessagebinderprovisioner() {
return new fileprovisioningprovider();
}
@bean
@conditionalonmissingbean
public filemessagechannelbinder filemessagebinder(fileprovisioningprovider filemessagebinderprovisioner, fileextendedbindingproperties fileextendedbindingproperties) {
return new filemessagechannelbinder(null, filemessagebinderprovisioner, fileextendedbindingproperties);
}
@bean
public fileproducerproperties fileconsumerproperties() {
return new fileproducerproperties();
}
}详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream
到此这篇关于springcloudstream原理和深入使用的文章就介绍到这了,更多相关springcloudstream原理内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论