当前位置: 代码网 > it编程>编程语言>Java > Springboot3+Redis实现消息队列的多种方法小结

Springboot3+Redis实现消息队列的多种方法小结

2025年03月20日 Java 我要评论
redis实现消息队列和rabbitmq的优缺点redis实现消息队列的优点:性能高:redis是内存数据库,读写速度快,适合高并发的消息推送。数据结构丰富:redis支持多种数据结构,如列表(lis

redis实现消息队列和rabbitmq的优缺点

redis实现消息队列的优点:

  • 性能高:redis是内存数据库,读写速度快,适合高并发的消息推送。
  • 数据结构丰富:redis支持多种数据结构,如列表(list)、集合(set)、有序集合(zset)等,可以实现多种消息队列模式。
  • 易用性:redis的命令简单,学习成本低,易于上手。
  • 持久化:redis支持rdb和aof持久化,能够保证数据的安全。
  • 分布式支持:redis支持主从复制、哨兵和集群,可以实现高可用和分布式架构。

redis实现消息队列的缺点:

  • 功能有限:相比于专业的消息队列中间件,redis的消息队列功能相对简单,不支持复杂的消息路由、事务、消息优先级等特性。
  • 消息可靠性:redis没有内置的消息确认机制,需要自行实现,可能会增加开发复杂度。
  • 数据量限制:受内存限制,redis不适合存储大量消息。
  • 消息持久化问题:虽然redis支持持久化,但在大数据量情况下,持久化可能会影响性能。

rabbitmq的优点:

  • 功能强大:rabbitmq是一个专业的消息队列中间件,支持消息持久化、事务、消息优先级、延迟消息、死信队列等高级特性。
  • 高可用性:rabbitmq支持镜像队列,可以实现高可用架构。
  • 灵活的路由:rabbitmq支持多种交换机类型(direct, topic, headers, fanout),可以实现复杂的消息路由。
  • 客户端支持广泛:rabbitmq有多种语言客户端,方便集成。
  • 社区活跃:rabbitmq有活跃的社区,问题解决速度快。

rabbitmq的缺点:

  • 性能相对较低:相比于redis,rabbitmq的性能略低,尤其是在高并发场景下。
  • 资源消耗:rabbitmq需要更多的系统资源,如cpu和内存。
  • 复杂性:rabbitmq的概念模型较为复杂,学习曲线较陡峭

redis适合于需要高速读写、轻量级消息队列的场景,如果业务对消息队列的功能要求不高,且已经使用了redis,可以考虑使用redis实现消息队列。其他情况下还是建议使用rabbitmq

1.spring data redis

这是spring框架提供的一个用于简化redis操作的模块。

初始准备

1.1首先配置pom依赖

<dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>

1.2 在yml中配置redis相关信息

由于spring.redis.host 这种配置已经被弃用,在新版的springboot中,需要在spring.data.redis.host 进行配置

spring:
  data:
    redis:
    # 改为自己的地址和密码
      host: 10.69.37.213
      port: 6379
      password: jolly
      # 连接超时时间,单位ms
      connect-timeout: 50000
      # 选择第几个数据库,默认为0,最大值15
      database: 0
      lettuce:
        pool:
          # 最大的活跃连接数,不会超过这个数,根据项目预期并发量调整
          max-active: 50
          # max-idle 指定了连接池中的最大空闲连接数。
          # 空闲连接是指那些没有被使用,但是已经创建并且保持在连接池中的连接
          # 这个值应该与max-active相匹配或者稍微低一些,
          # 以保持连接池中有足够的空闲连接来处理突发请求。
          # 设置得太高可能会导致资源浪费,因为空闲连接会占用内存和其他资源。
          max-idle: 30
          #这个配置指定了连接池中的最小空闲连接数。
          #这个设置可以确保连接池始终保持一定数量的空闲连接,以便在请求到来时可以立即使用,而不需要等待连接的创建。
          # 这个值应该根据你的应用程序的基线负载来设置
          min-idle: 10
          # 当连接池达到最大活跃连接数时,客户端等待可用连接的最大时间(以毫秒为单位)。-1 表示无限等待
          # 如果设置为一个正数,那么在等待时间超过这个值后,会抛出一个异常。
          max-wait: -1

1.3 设置redis的序列化

为了防止存入到redis的数据出现乱码的情况,进行序列化的设置

import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.data.redis.serializer.jackson2jsonredisserializer;
import org.springframework.data.redis.serializer.stringredisserializer;
import org.springframework.boot.autoconfigure.condition.conditionalonmissingbean;

@configuration
public class redisconfig {

	@conditionalonmissingbean(name = "redistemplate")
    @bean
    public redistemplate<object, object> redistemplate(redisconnectionfactory connectionfactory) {
        redistemplate<object, object> template = new redistemplate<>();
        template.setconnectionfactory(connectionfactory);
        // 默认为utf-8,可以进行修改
        template.setkeyserializer(new stringredisserializer());
        // 原版默认使用jdk的序列化方式jdkserializationredisserializer
        jackson2jsonredisserializer serializer = new jackson2jsonredisserializer(object.class);
        template.setvalueserializer(serializer);
        // 设置hash的序列化化方式
        template.sethashkeyserializer(new stringredisserializer());
        template.sethashvalueserializer(serializer);
        // 设置属性
        template.afterpropertiesset();
        return template;
    }
}

2.redis实现消息队列的方式

2.1 使用redis的list实现消息队列

首先构造一个简单的订单类,用于后面消息队列测试

import lombok.allargsconstructor;
import lombok.data;

@data
@allargsconstructor
public class order implements serializable {
    private int id;
    private string userid;
    private string goodname;
}

我们使用最简单的方式来实现消息队列,直接不断轮询list中是否有消息

import jakarta.annotation.resource;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
public class mq {

    @resource
    private redistemplate<string, object> redistemplate;

	// 队列名
    private final string queue = "order_queue";

    @getmapping("/order")
    public void order(){
       	// 为了模拟消息的获取,异步开启一个线程,进行消息处理
        thread thread = new thread(() -> {
            try {
                processdata();
            } catch (interruptedexception e) {
                throw new runtimeexception(e);
            }
        });
        thread.start();
         // 模拟产生10条消息,放入队列
        for (int i = 0; i < 10; i++) {
            order order = new order(i, i, "goods" + i);
            redistemplate.opsforlist().leftpush(queue, order);
            system.out.println("放入消息队列:"+i);
        }
    }

    // 处理消息,不断的轮询队列中的消息
    public void processdata() throws interruptedexception {
        while (true){
            object order = redistemplate.opsforlist().rightpop(queue);
            if(order == null){
                system.out.println("当前没有消息");
                thread.sleep(1000);
            }else{
                system.out.println("处理消息:"+order);
            }
        }
    }
}

这种方式是最简单的方式,但是不推荐,因为一直轮询是会浪费cpu资源的,拉低服务端的性能。

2.2 消息订阅模式

redis 支持消息队列的一种模式是通过其发布订阅(publish/subscribe)功能。这种模式允许客户端订阅一个或多个频道(channel),并接收发送到这些频道的消息。

2.2.1 发布消息

这一步是比较简单的,直接调用方法即可.

    @resource
    private redistemplate<string, object> redistemplate;

    private final string channel_name  = "order_queue";
    
	@getmapping("/order")
    public void order(){
        // 模拟产生10条消息,放入队列
        for (int i = 0; i < 10; i++) {
            order order = new order(i, i, "goods" + i);
            //发布消息
            redistemplate.convertandsend(channel_name, order);
            system.out.println("放入消息队列:"+i);
        }
    }

2.2.2 消息监听

首先我们需要取实现messagelistener接口的方法

import org.springframework.data.redis.connection.message;
import org.springframework.data.redis.connection.messagelistener;
import org.springframework.stereotype.service;

@service
public class sub implements messagelistener {
	
	// 当监听到有消息的时候,就会执行这个方法
    @override
    public void onmessage(message message, byte[] pattern) {
        string msg = new string(message.getbody());
        // 模拟延迟处理
        try {
            thread.sleep(2000);  // 假设处理需要2秒
        } catch (interruptedexception e) {
            thread.currentthread().interrupt();
        }
        system.out.println("处理消息:"+msg);
    }

}

然后可以在开始的redisconfig类里面加上下面的代码

	/**
     * 因为标记了@bean 注解,所以会在springboot启动的时候调用该方法创建,也可以放在其他地方进行创建
     * 当调用这个方法时,redisconnectionfactory 这个对象已经存在于springboot的容器内,然后调用这个
     * 方法的时候就会传入该参数,执行方法后会创建一个redismessagelistenercontainer,这样可以在其他类
     * 里面管理这些监听messagelistener
     * @param connectionfactory
     * @return
     */
    @bean
    public redismessagelistenercontainer rediscontainer(redisconnectionfactory connectionfactory) {
        // 首先创建一个监听的容器,这个容器可以传入多个messagelistener
        redismessagelistenercontainer container = new redismessagelistenercontainer();
        // 注入一个连接池工厂
        container.setconnectionfactory(connectionfactory);
        // 创建一个自己的监听类
        sub sub = new sub();
        // 然后和名为order_queue的通道进行绑定
        container.addmessagelistener(sub,new channeltopic("order_queue"));
        return container;
    }

2.2.3 结果

在这里插入图片描述

2.3 基于stream进行实现

redis stream 是 redis 5.0 版本引入的一种新的数据结构,用于存储和操作消息流。它类似于消息队列,但提供了更丰富的功能,允许你以有序、可持久化的方式存储多个字段-值对的消息。

2.3.1 优点

  • 持久化存储:stream可以持久化消息到磁盘,即使redis服务器重启,消息也不会丢失。
  • 有序性:stream保证了消息的有序性,每个消息都有一个唯一的id,按照进入队列的顺序排列。
  • 多消费者:stream支持多个消费者,可以有不同的组(consumer groups)消费同一个stream,而且每个消费者可以独立消费,不会互相影响。
  • 消息确认机制:消费者可以读取消息并进行确认(ack),确保消息不会因为消费者故障而丢失。
  • 消息回溯:stream允许新的消费者从任意位置开始消费消息,包括从stream的开始位置,这使得新的消费者可以回溯并处理之前的消息。
  • 灵活的消息长度:stream中的消息可以是任意长度的字符串,可以包含复杂的数据结构,如json。
  • 阻塞读取:消费者可以使用blpopbrpop等命令进行阻塞读取,直到有新的消息到来。
  • 事务支持:可以利用redis的事务特性,确保消息的写入和读取操作是原子性的。
  • 时间戳:每个消息可以包含时间戳字段,便于进行基于时间的消息管理。
  • 易于监控:stream的结构便于监控队列的长度、消费者状态等信息。

2.3.2 实现

我们模拟一个抢购订单场景,比如我们的服务器只能每秒处理50个请求,请求太多可能会导致我们的服务直接宕机,那么我们可以把请求放入消息队列,让消息队列来抗住大量的请求。
我们的策略可以是消息队列限量50个请求,当请求到来时,消息数量大于50n我们直接返回让用户重试,服务太忙的提示,这也是很常见的提示。

import com.xujialin.springboot3_study.entity.order;
import jakarta.annotation.postconstruct;
import jakarta.annotation.resource;
import org.springframework.data.redis.redissystemexception;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.restcontroller;

import java.time.duration;
import java.util.hashmap;
import java.util.list;

@restcontroller
public class streamcont {

    @resource
    private redistemplate<string,object> redistemplate;

    private string stream_key = "stream_key";



    @getmapping("/order")
    public void order() {
        //封装请求,假装这是高并发场景
        for (int j = 0; j < 100; j++) {
            new thread(() -> {
                for (int i = 0; i < 100; i++) {
                    order order = new order(i, i, "goods" + i);
                    hashmap<string,order> map = new hashmap<>();
                    map.put("order", order);
                    long size = redistemplate.opsforstream().size(stream_key);
                    if(size > 500){
                        system.out.println("活动太火爆了,请重试!");
                        continue;
                    }
                    redistemplate.opsforstream().add(stream_key,map);
                }
            }).start();
        }
        system.out.println("恭喜你抢到了");
    }

    @postconstruct
    public void init(){
        // 第一个是stream的key,第二个是组名
//        redistemplate.opsforstream().creategroup(stream_key, "g1");
        try {
            redistemplate.opsforstream().creategroup(stream_key, "g1");
        } catch (redissystemexception e) {
            // 如果 group 已存在,抛出异常,可忽略
            system.out.println("group已经存在");
        }
        for (int i = 0; i < 5; i++) {
            new thread(new consumer()).start();
        }
    }

    class consumer implements runnable {

        @override
        public void run() {
            while(true){
                // 读取消息
                list<maprecord<string, object, object>> list = redistemplate.opsforstream().read(
                        // 为g1消费者组创建一个消费则名字,可以为每个线程的名字,也可以为一个固定的名字,
                        // 一条消息最多只能被组里面的一个消费者消费,如果一条消息同时被两个消费者消费,
                        // 那么这两个消费者应该隶属于不同的消费者组,所以同一个名字或者不同的名字,对于同一个
                        // 消费组没有太大区别
                        consumer.from("g1", thread.currentthread().getname()),
                        // 创建一个读取选项,创建一个空的 streamreadoptions 实例。这是配置读取选项的起点
                        // .count(1): 设置读取操作返回的最大消息数量。意味着每次读取操作最多只会返回一条消息。
                        //.block(duration.ofseconds(2)): 配置读取操作为阻塞模式,并设置阻塞的超时时间为2s,
                        // 也可以设置单位
                        streamreadoptions.empty().count(1).block(duration.ofseconds(2)),
                        // 创建一个偏移量,readoffset.lastconsumed(): 这是指定读取消息的偏移量。
                        // 表示从消费者组中最后一次确认消费的消息之后开始读取新的消息。
                        streamoffset.create( "stream_key", readoffset.lastconsumed()));

                // 读取消息
                if (list != null && !list.isempty()) {
                    maprecord<string, object, object> entries = list.get(0);
                    // 模拟处理消息
                    system.out.println(entries);
                    // 确认消息
                    redistemplate.opsforstream().acknowledge("stream_key","g1",entries.getid());
                }
            }
        }
    }
}

还可以使用更优雅的实现,使用 streammessagelistenercontainer 可以创建一个更高级的消息监听机制,它允许你注册 streamlistener,这样你就可以实现基于事件的异步消息处理,而不是阻塞读取。这种方式更适合生产环境,因为它提供了更好的资源管理和错误处理机制。

到此这篇关于springboot3+redis实现消息队列的多种方法小结的文章就介绍到这了,更多相关springboot3 redis消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com