当前位置: 代码网 > it编程>编程语言>Java > Spring Cloud Stream整合RocketMQ的搭建方法

Spring Cloud Stream整合RocketMQ的搭建方法

2024年11月26日 Java 我要评论
spring cloud stream整合rocketmq这里书接上回,默认你已经搭建好了rocketmq主从异步集群,前面文章已经介绍过搭建方法。1、spring cloud stream介绍spr

spring cloud stream整合rocketmq

这里书接上回,默认你已经搭建好了rocketmq主从异步集群,前面文章已经介绍过搭建方法。

1、spring cloud stream介绍

spring cloud stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。

官网:https://spring.io/projects/spring-cloud-stream

该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的spring习惯用法和最佳实践,包括对持久pub/sub语义、消费者组和有状态分区的支持。

spring cloud stream的核心构建块是:

  • destination binders:负责提供与外部消息传递系统集成的组件。
  • destination bindings:外部消息系统和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
  • message:生产者和消费者用来与目标绑定器(以及通过外部消息系统的其他应用程序)进行通信的规范数据结构。

2、生产者

2.1 引入依赖

<dependencies>
        <dependency>
            <groupid>com.alibaba.cloud</groupid>
            <artifactid>spring-cloud-alibaba-dependencies</artifactid>
            <version>${spring-cloud-alibaba.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>com.alibaba.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rocketmq</artifactid>
            <version>2.2.2.release</version>
            <exclusions>
                <exclusion>
                    <groupid>org.apache.rocketmq</groupid>
                    <artifactid>rocketmq-client</artifactid>
                </exclusion>
                <exclusion>
                    <groupid>org.apache.rocketmq</groupid>
                    <artifactid>rocketmq-acl</artifactid>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupid>org.apache.rocketmq</groupid>
            <artifactid>rocketmq-client</artifactid>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupid>org.apache.rocketmq</groupid>
            <artifactid>rocketmq-acl</artifactid>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

注意,rocketmq官方维护的spring-cloud-stream依赖中的rocketmq版本为4.4,需要排出后加入4.7.1的依。

2.2 编写配置文件

spring:
  application:
    name: my-spring-cloud-rocketmq-producer
  cloud:
    stream:
      bindings:
        output:
          destination: topictest
      rocketmq:
        binder:
          name-server: 192.168.159.34:9876
server:
  port: 8080

2.3 启动类打上注解

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.stream.annotation.enablebinding;
import org.springframework.cloud.stream.messaging.source;
@enablebinding(source.class)
@springbootapplication
public class myspringcloudrocketmqproducerapplication {
    public static void main(string[] args) {
        springapplication.run(myspringcloudrocketmqproducerapplication.class, args);
    }
}

其中,@enablebinding(source.class)指向配置文件的output参数。

2.4 编写生产者程序

import org.apache.rocketmq.common.message.messageconst;
import org.springframework.cloud.stream.messaging.source;
import org.springframework.messaging.message;
import org.springframework.messaging.messageheaders;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.stereotype.component;
import javax.annotation.resource;
import java.util.hashmap;
import java.util.map;
@component
public class myproducer {
    @resource
    private source source;
    public void sendmessage(string msg){
        //封装消息头
        map<string,object> headers=new hashmap<>();
        headers.put(messageconst.property_tags,"taga");
        messageheaders messageheaders=new messageheaders(headers);
        //创建消息对象
        message<string> message = messagebuilder.createmessage(msg, messageheaders);
        //发送消息
        source.output().send(message);
    }
}

2.5 编写单元测试发送消息

@springboottest
class myspringcloudrocketmqproducerapplicationtests {
    @autowired
    private myproducer producer;
    @test
    void contextloads() {
        producer.sendmessage("hello,spring cloud stream message");
    }
}

3、消费者

3.1 引入依赖

与生产者相同。

3.2 编写配置文件

spring:
  application:
    name: my-spring-cloud-rocketmq-consumer
  cloud:
    stream:
      bindings:
      	# input消费者
        input:
          destination: topictest
          group: spring-cloud-stream-consumer-group
      # 配置rocketmq
      rocketmq:
        binder:
          name-server: 192.168.159.34:9876
server:
  port: 8081

3.3 启动类打上注解

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.stream.annotation.enablebinding;
import org.springframework.cloud.stream.messaging.sink;
@enablebinding(sink.class)
@springbootapplication
public class myspringcloudrocketmqconsumerapplication {
    public static void main(string[] args) {
        springapplication.run(myspringcloudrocketmqconsumerapplication.class, args);
    }
}

其中@enablebinding(sink.class)指向配置文件的input参数。

3.4 编写消费者程序

import org.springframework.cloud.stream.annotation.streamlistener;
import org.springframework.cloud.stream.messaging.sink;
import org.springframework.stereotype.component;
@component
public class myconsumer {
    @streamlistener(sink.input)
    public void processmessage(string message){
        system.out.println("收到的消息:"+message);
    }
}

先启动消费者,使用单元测试发送消息。

到此这篇关于spring cloud stream整合rocketmq的文章就介绍到这了,更多相关spring cloud stream整合rocketmq内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com