当前位置: 代码网 > it编程>编程语言>Java > SpringCloud Stream 快速入门实例教程

SpringCloud Stream 快速入门实例教程

2025年11月26日 Java 我要评论
1.scs 组件的出现的背景和作用在分布式系统中,可能使用到的消息队列让人眼花缭乱,可能有使用(rabbitmq rroketmq kafka....),他们提供的客户端各不相同,使用的方式也让人眼花

1.scs 组件的出现的背景和作用

在分布式系统中,可能使用到的消息队列让人眼花缭乱,可能有使用(rabbitmq rroketmq kafka....),他们提供的客户端各不相同,使用的方式也让人眼花缭乱,此时就需要一个能够统一消息队列的客户端,通过更高级的抽象来实现更通用和更简单的集成不同的消息队列中间件,此时也就诞生了这个scs 组件

2.scs 集成srping boot项目

我们在这个演示项目中所使用的spring boot版本为 2.7.18、springcloud alibaba版本为 2021.0.6.0

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>com.alibaba.cloud</groupid>
                <artifactid>spring-cloud-alibaba-dependencies</artifactid>
                <version>2021.0.6.0</version>
                <type>pom</type>
                <scope> import</scope>
            </dependency>
            <dependency>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-dependencies</artifactid>
                <version>2.7.18</version>
                <type>pom</type>
                <scope> import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

使用的scs 组件版本为 3.2.10

        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-stream-binder-kafka</artifactid>
            <version>3.2.10</version>
        </dependency>

3.yml 配置

scs 的使用难点主要就是在yml 的配置上,配置完成使用很方便

spring:
  cloud:
    function:
      definition: mytaskconsumer;ackconsumer #你注册的 consumer 方法名 或者 function 方法名 中间使用 ;分割 (生产者一般是动态发送消息 不需要注册)
    stream:
      binders:
        kafka-binder-1: # 绑定器名称
          type: kafka  # 消息队列的类型类型
          environment: #绑定器环境配置
            spring:
              kafka:
                bootstrap-servers: 172.22.134.135:9092 # kafka地址 可以设置多个
                properties:
                  security.protocol: plaintext # kafka协议
        #rabbit-binde-1r:
         # type: rabbit
          # ... rabbitmq配置
      # 全局生产者可靠性配置(推荐)
      binder:
        producer-properties:
          acks: all                # 👈 生产者 ack = all 所有副本同步完成才ack;ack=1写入leader副本返回ack;ack=0 生产者发送消息立马ack
          retries: 100          # 最大重试 当发送失败时(如网络抖动、leader 切换),producer 自动重试的最大次数。
          retry.backoff.ms: 1000 #每次重试之间的等待时间(毫秒)。
          enable-idempotence: true      # 幂等生产者(防重复)
      bindings:
        mytaskconsumer-in-0: #命名规则 ${方法名}-${消费者:in/生产者:out}-${数字:不能与其他相同}
          destination: test-kraft # topic
          group: my-consumer-group #消费者组
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          consumer: # 消费者配置
            autostartup: true # 是否自动启动
            concurrency: 1  #启动消费者实例数  (同属于一个消费者组)
        mytaskproducer-out-0:
          destination: test-kraft # topic
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          producer: # 生产者配置
              partitioncount: 1 # 应与目标 topic 的实际分区数一致。
                                # - 若小于实际分区数:仅使用部分分区,浪费并行能力;
                                # - 若大于实际分区数:发送时会因访问不存在的分区而失败!
              #使用消息头中的 headers的 partitionkey 作为 key进行分区
              partition-key-expression: headers.partitionkey # 分区键(分区规则根据key进行hash落到分区 有助于落到指定分区顺序消费)

4.springcloud stream 3.x新特性函数编程

4.1.编写 消费者

        mesage 的包别导错

import org.springframework.messaging.message;
@configuration
public class kafkaconsumer
{
	@bean
	public consumer<message<string>> mytaskconsumer ()
	{
		system.out.println ("[初始化] mytaskconsumer bean 已创建");
		return message -> system.out.println ("[mytaskconsumer] 收到消息: " + message.getpayload ());
	}
}

4.2.编写动态生产者

@restcontroller
public class sendcontroller
{
	@autowired
	streambridge streambridge;
	@getmapping ("/sendmytaskproducer/{msg}")
	public string send (@pathvariable ("msg") string msg)
	{
		//构建消息
		message<string> message = messagebuilder.withpayload (msg)
				.setheader ("partitionkey", msg) // 添加分区键partitionkey 作为分区键
				.build ();
		//参数1为发送的通道名称(在yml中配置),参数2为消息
		boolean mytaskproducer = streambridge.send ("mytaskproducer-out-0", message);
		system.out.println ("发送结果:" + mytaskproducer);
		return "发送结果:" + mytaskproducer;
	}
}

5.进行测试

访问发送消息的接口,发送成功,并且消费者进行了消费

6.进行消费者手动ack

消费者手动ack 比自动ack 要安全得多,默认scs 是实行自动ack,自动ack只要消息被投递到消费者,不论是否消费成功或者失败,都会被视为消费成功

6.1yml 配置

        #========================================消费者ack kafka 专属配置========================================
        #演示消费者ack机制
        ackconsumer-in-0: #命名规则 ${方法名}-${消费者:in/生产者:out}-${数字:不能与其他相同}
          destination: topicone # topic
          group: ack-consumer-group #消费者组 (修改为独立的消费者组,避免与mytaskconsumer冲突)
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          consumer: # 消费者配置
            autostartup: true # 是否自动启动
            concurrency: 1  #启动消费者实例数  (同属于一个消费者组)
        ackproducer-out-0:
          destination: topicone # topic
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          producer: # 生产者配置
            partitioncount: 1 # 应与目标 topic 的实际分区数一致。
              # - 若小于实际分区数:仅使用部分分区,浪费并行能力;
            # - 若大于实际分区数:发送时会因访问不存在的分区而失败!
            #使用消息头中的 headers的 partitionkey 作为 key进行分区
            partition-key-expression: headers.partitionkey # 分区键(分区规则根据key进行hash落到分区 有助于落到指定分区顺序消费)
      # kafka 专属配置
      kafka:
        bindings:
          ackconsumer-in-0: # 👈指定哪个消费者使用ack
            consumer:
              ack-mode: manual  # 👈 关键!手动 ack 模式
                                #record	每条消息处理完自动提交 offset(默认)	简单场景
                                #batch	批量提交(一批 poll 的消息处理完后提交)	默认行为(等价于 auto-commit=true)
                                #time	定时提交	较少用
                                #count	每 n 条提交一次	较少用
                                #manual	手动调用 acknowledge() 才提交	✅ 需要精确控制(推荐)
                                #manual_immediate	手动调用立即提交(不等批次)	高可靠性要求

6.2编写消费者

@bean
	public consumer<message<string>> ackconsumer(){
		system.out.println ("[初始化] ackconsumer bean 已创建");
		return message  -> {
			system.out.println ("[ackconsumer] ========== 开始处理消息 ==========");
			system.out.println ("[ackconsumer] 消息内容: " + message.getpayload());
			system.out.println ("[ackconsumer] 消息headers: " + message.getheaders());
			//获取acknowledgment
			acknowledgment ack = message.getheaders ()
					.get (kafkaheaders.acknowledgment, acknowledgment.class);
			if (ack != null) {
				//进行手动ack
				ack.acknowledge ();
				system.out.println ("[ackconsumer] ✅ 已手动确认消息");
			} else {
				system.out.println ("[ackconsumer] ⚠️ 警告: acknowledgment为null,无法手动确认");
			}
			system.out.println ("[ackconsumer] ========== 消息处理完成 ==========\n");
		};
	}

6.3编写生产者

@getmapping ("/sendackproducer/{msg}")
	public string send2 (@pathvariable ("msg") string msg)
	{
		//构建消息
		message<string> message = messagebuilder.withpayload (msg)
				.setheader ("partitionkey", msg) // 添加分区键partitionkey 作为分区键
				.build ();
		//参数1为发送的通道名称(在yml中配置),参数2为消息
		boolean ackproducer = streambridge.send ("ackproducer-out-0", message);
		system.out.println ("发送结果:" + ackproducer);
		return "发送结果:" + ackproducer;
	}

6.4测试结果

到此这篇关于springcloud stream 快速入门的文章就介绍到这了,更多相关springcloud stream 入门内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com