当前位置: 代码网 > it编程>编程语言>Java > SpringCloudStream原理和深入使用小结

SpringCloudStream原理和深入使用小结

2024年07月03日 Java 我要评论
简单概述spring cloud stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。应用程序通过inputs或outputs来与spring cloud strea

简单概述

spring cloud stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。

应用程序通过inputs或outputs来与spring cloud stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:spring cloud stream能够屏蔽底层消息中间件【rabbitmq,kafka等】的差异,降低切换成本,统一消息的编程模型

相关概念

channel(通道):channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到destination,消费者通过输入通道从destination接收消息。

在spring cloud stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  • input(输入):input通道用于消费者从消息代理接收消息。消费者可以通过监听input通道来实时接收传入的消息
  • output(输出):output通道用于生产者向消息代理发送消息。生产者可以通过向output通道发送消息来发布新的消息

destination(目标):destination是消息的目的地,通常对应于消息代理中的topic或queue。生产者将消息发送到特定的destination,消费者从其中接收消息。

binder(绑定器):binder是spring cloud stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。spring cloud stream提供了多个可用的binder实现,包括rabbitmq、kafka等。

**消费者组:**在spring cloud stream中,消费组(consumer group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)

分区:spring cloud stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

spring message

spring message是spring framework的一个模块,其作用就是统一消息的编程模型。

package org.springframework.messaging;
public interface message<t> {
    t getpayload();
    messageheaders getheaders();
}

消息通道 messagechannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

@functionalinterface
public interface messagechannel {
	long indefinite_timeout = -1;
	default boolean send(message<?> message) {
		return send(message, indefinite_timeout);
	}
	boolean send(message<?> message, long timeout);
}

消息通道里的消息由消息通道的子接口可订阅的消息通道subscribablechannel实现,被messagehandler消息处理器所订阅

public interface subscribablechannel extends messagechannel {
	boolean subscribe(messagehandler handler);
	boolean unsubscribe(messagehandler handler);
}

messagehandler真正地消费/处理消息

@functionalinterface
public interface messagehandler {
    void handlemessage(message<?> message) throws messagingexception;
}

spring integration

spring integration 提供了 spring 编程模型的扩展用来支持企业集成模式(enterprise integration patterns),是对 spring messaging 的扩展。

它提出了不少新的概念,包括消息路由messageroute、消息分发messagedispatcher、消息过滤filter、消息转换transformer、消息聚合aggregator、消息分割splitter等等。同时还提供了messagechannel和messagehandler的实现,分别包括 directchannel、executorchannel、publishsubscribechannel和messagefilter、serviceactivatinghandler、methodinvokingsplitter 等内容。

spring-cloud-stream的架构

img

快速入门

引入依赖

        <!--stream-->
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rabbit</artifactid>
        </dependency>

增加配置文件

spring:
    cloud:
        stream:
            # 定义消息中间件
            binders:
              myrabbit:
                  type: rabbit
                  environment:
                    spring:
                        rabbitmq:
                            host: localhost
                            port: 5672
                            username: root
                            password: root
                            vhost: /
            bindings:
            # 生产者中定义,定义发布对象
              myinput:
                destination: mystreamexchange
                group: mystreamgroup
                binder: myrabbit
            # 消费者中定义,定义订阅的对象
              myoutput-in-0:
                destination: mystreamexchange
                group: mystreamgroup
                binder: myrabbit
        # 消费者中定义,定义输出的函数
        function:
            definition: myoutput

生产者

@resource
	private streambridge streambridge;
	public void sendnormal() {
		streambridge.send("myinput", "hello world");
	}

消费者

@bean("myoutput")
	public consumer<message<string>> myoutput() {
		return (message) -> {
			messageheaders headers = message.getheaders();
			system.out.println("myoutput head is : " + headers);
			string payload = message.getpayload();
			system.out.println("myoutput payload is : " + payload);
		};
	}

如何自定义binder

  • 添加spring-cloud-stream依赖
  • 提供provisioningprovider的实现提供
  • messageproducer的实现提供
  • messagehandler的实现提供
  • binder的实现创建binder的配置
  • meta-inf/spring.binders中定义绑定器

添加spring-cloud-stream依赖

<dependency>
    <groupid>org.springframework.cloud</groupid>
    <artifactid>spring-cloud-stream</artifactid>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供provisioningprovider的实现

provisioningprovider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

public class fileprovisioningprovider implements provisioningprovider<
	extendedconsumerproperties<fileconsumerproperties>, extendedproducerproperties<fileproducerproperties>> {
	public fileprovisioningprovider() {
		super();
	}
	@override
	public producerdestination provisionproducerdestination(string name, extendedproducerproperties<fileproducerproperties> properties) throws provisioningexception {
		return new filemessagedestination(name);
	}
	@override
	public consumerdestination provisionconsumerdestination(string name, string group, extendedconsumerproperties<fileconsumerproperties> properties) throws provisioningexception {
		return new filemessagedestination(name);
	}
	private static class filemessagedestination implements producerdestination, consumerdestination {
		private final string destination;
		private filemessagedestination(final string destination) {
			this.destination = destination;
		}
		@override
		public string getname() {
			return destination.trim();
		}
		@override
		public string getnameforpartition(int partition) {
			throw new unsupportedoperationexception("partitioning is not implemented for file messaging.");
		}
	}
}

提供messageproducer的实现

messageproducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。

super.oninit();
		executorservice = executors.newscheduledthreadpool(1);
	}
	@override
	public void dostart() {
		executorservice.schedulewithfixeddelay(() -> {
			string payload = getpayload();
			if (payload != null) {
				message<string> receivedmessage = messagebuilder.withpayload(payload).build();
				sendmessage(receivedmessage);
			}
		}, 0, 50, timeunit.milliseconds);
	}
	@override
	protected void dostop() {
		executorservice.shutdownnow();
	}
	private string getpayload() {
		try {
			list<string> alllines = files.readalllines(paths.get(fileextendedbindingproperties.getpath() + file.separator + destination.getname() + ".txt"));
			string currentpayload = alllines.get(alllines.size() - 1);
			if (!currentpayload.equals(previouspayload)) {
				previouspayload = currentpayload;
				return currentpayload;
			}
		} catch (ioexception e) {
			fileutil.touch(new file(fileextendedbindingproperties.getpath() + file.separator + destination.getname() + ".txt"));
		}
		return null;
	}
}

提供messagehandler的实现

messagehandler提供产生事件所需的逻辑。

public class filemessagehandler extends abstractmessagehandler {
	fileextendedbindingproperties fileextendedbindingproperties;
	producerdestination destination;
	public filemessagehandler(producerdestination destination, fileextendedbindingproperties fileextendedbindingproperties) {
		this.destination = destination;
		this.fileextendedbindingproperties = fileextendedbindingproperties;
	}
	@override
	protected void handlemessageinternal(message<?> message) {
		try {
			if (message.getpayload() instanceof byte[]) {
				files.write(paths.get(fileextendedbindingproperties.getpath() + file.separator + destination.getname() + ".txt"), (byte[]) message.getpayload());
			} else {
				throw new runtimeexception("处理消息失败");
			}
		} catch (ioexception e) {
			throw new runtimeexception(e);
		}
	}
}

提供binder的实现

提供自己的binder抽象实现:

  • 扩展abstractmessagechannelbinder
  • 将自定义的 provisioningprovider 指定为 abstractmessagechannelbinder 的通用参数
  • 重写createproducermessagehandlercreateconsumerendpoint方法
public class filemessagechannelbinder extends abstractmessagechannelbinder
	<extendedconsumerproperties<fileconsumerproperties>, extendedproducerproperties<fileproducerproperties>, fileprovisioningprovider>
	implements extendedpropertiesbinder<messagechannel, fileconsumerproperties, fileproducerproperties> {
	fileextendedbindingproperties fileextendedbindingproperties;
	public filemessagechannelbinder(string[] headerstoembed, fileprovisioningprovider provisioningprovider, fileextendedbindingproperties fileextendedbindingproperties) {
		super(headerstoembed, provisioningprovider);
		this.fileextendedbindingproperties = fileextendedbindingproperties;
	}
	@override
	protected messagehandler createproducermessagehandler(producerdestination destination, extendedproducerproperties<fileproducerproperties> producerproperties, messagechannel errorchannel) throws exception {
		filemessagehandler filemessagehandler = new filemessagehandler(destination, fileextendedbindingproperties);
		return filemessagehandler;
	}
	@override
	protected messageproducer createconsumerendpoint(consumerdestination destination, string group, extendedconsumerproperties<fileconsumerproperties> properties) throws exception {
		filemessageproduceradapter filemessageproduceradapter = new filemessageproduceradapter(destination, fileextendedbindingproperties);
		return filemessageproduceradapter;
	}
	@override
	public fileconsumerproperties getextendedconsumerproperties(string channelname) {
		return fileextendedbindingproperties.getextendedconsumerproperties(channelname);
	}
	@override
	public fileproducerproperties getextendedproducerproperties(string channelname) {
		return fileextendedbindingproperties.getextendedproducerproperties(channelname);
	}
	@override
	public string getdefaultsprefix() {
		return fileextendedbindingproperties.getdefaultsprefix();
	}
	@override
	public class<? extends binderspecificpropertiesprovider> getextendedpropertiesentryclass() {
		return fileextendedbindingproperties.getextendedpropertiesentryclass();
	}
}

创建binder的配置

严格要求创建一个 spring 配置来初始化你的绑定器实现的 bean

@enableconfigurationproperties(fileextendedbindingproperties.class)
@configuration
public class filemessagebinderconfiguration {
	@bean
	@conditionalonmissingbean
	public fileprovisioningprovider filemessagebinderprovisioner() {
		return new fileprovisioningprovider();
	}
	@bean
	@conditionalonmissingbean
	public filemessagechannelbinder filemessagebinder(fileprovisioningprovider filemessagebinderprovisioner, fileextendedbindingproperties fileextendedbindingproperties) {
		return new filemessagechannelbinder(null, filemessagebinderprovisioner, fileextendedbindingproperties);
	}
	@bean
	public fileproducerproperties fileconsumerproperties() {
		return new fileproducerproperties();
	}
}

详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream

到此这篇关于springcloudstream原理和深入使用的文章就介绍到这了,更多相关springcloudstream原理内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com