当前位置: 代码网 > it编程>游戏开发>ar > RabbitMQ订单超时笔记

RabbitMQ订单超时笔记

2024年08月06日 ar 我要评论
队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。延迟队列,最重要的特性就体现在它的延时属性上,跟普通队列不一样的是,普通队列中的元素总是等着希望被早点取出消费,而延迟队列中的元素则是希望在指定时间被取出消费,所以延迟队列中的元素是都是带时间属性的。利用懒加载的思想,当用户或商户查询订单时,再判断该订单是否超时,超时则进行业务处理。简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。

使用rabbitmq实现订单超时管理

方案分析

  1. jdk延迟队列
  2. 定时任务
  3. 被动取消
  4. redis sorted set
  5. redis事件通知
  6. 时间轮算法
  7. rabbitmq

jdk延迟队列

该方案是利用jdk自带的juc包中的delayqueue队列。

pubilc class delayqueue<e extend delay> extends abstractqueue<e> implements blockingqueue<e> 

这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入delayqueue中的对象,必须实现delayed接口
在这里插入图片描述

  • offer(): 添加元素
  • poll(): 获取并移出队列的超时元素,没有则返回空
  • take(): 获取并移出队列的超时元素,没有则wait当前线程,直到有元素满足超时条件时返回结果

定时任务

这种方式是最简单的,启动一个计划任务,每隔一定时间(假设1分钟)去扫描数据库一次,通过订单时间来判断是否超时,然后进行update 或delete操作。如quartz

被动取消

利用懒加载的思想,当用户或商户查询订单时,再判断该订单是否超时,超时则进行业务处理。
这种方式依赖于用户的查询操作触发,如果用户不进行查询订单的操作,该订单就永远不会被取消。所以,实际应用中,也是被动取消+定时任务的组合方式来实现。这种情况下定时任务的时间可以设置的稍微“长”一点。

redis sorted set

redis有序集合(sorted set)每个元素都会关联一个double类型的分数score。redis可以通过分数来为集合中的成员进行从小到大的排序。

添加元素: zadd key score member [[score member] [score member]...]
按顺序查询元素: zrange key start end [withscores]
查询元素score: zscore key member
移出元素: zrem key member [member ...]

该方案可以将订单超时时间戳与订单编号分别设置为score和member。系统扫描第一个元素判断是否超时,超时则进行业务处理。
然而,这一版存在一个致命的硬伤,在高并发条件下,多个消费者会取到同一个订单编号,又需要编写lua脚本保证原子性或使用分布式锁,用了分布式锁性能又下降了。

rabbitmq

rabbitmq is the most widely deployed open source message broker

延迟队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
延迟队列,最重要的特性就体现在它的延时属性上,跟普通队列不一样的是,普通队列中的元素总是等着希望被早点取出消费,而延迟队列中的元素则是希望在指定时间被取出消费,所以延迟队列中的元素是都是带时间属性的。
简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
本文使用rabbitmq也是通过延迟队列的机制来实现订单超时的处理。然而 rabbitmg自身并没有延迟队列这个功能,实现该功能一般有以下两种方式:

  • 利用ttl(time to live)dlx(dead letter exchanges)实现延迟队列
  • 利用延迟队列插件rabbitmq_delayed_message_exchange实现

rabbitmq延迟队列实现

实现方案一

在这里插入图片描述

  • rabbitmq配置类
@configuration
public class rabbitmqconfig {
    // 声明 4个路由key 4个队列 2个交换机 属性
    // 延迟交换机
    public static final string delay_exchange_name = "delay.exchange";
    // 延迟队列
    public static final string delay_queue_a_name = "delay.queue.a";
    public static final string delay_queue_b_name = "delay.queue.b";
    // 延迟队列路由key
    public static final string delay_queue_a_routing_key = "delay.queue.a.routingkey";
    public static final string delay_queue_b_routing_key = "delay.queue.b.routingkey";

    // 死信交换机
    public static final string dead_letter_exchange_name = "dead.letter.exchange";
    // 死信队列
    public static final string dead_letter_queue_a_name = "dead.letter.queue.a";
    public static final string dead_letter_queue_b_name = "dead.letter.queue.b";
    // 死信队列路由key
    public static final string dead_letter_queue_a_routing_key = "dead.letter.delay_10s.routingkey";
    public static final string dead_letter_queue_b_routing_key = "dead.letter.delay_60s.routingkey";


    // 声明延迟交换机
    @bean
    public directexchange delayexchange() {
        return new directexchange(delay_exchange_name);
    }

    // 声明死信交换机
    @bean
    public directexchange deadletterexchange() {
        return new directexchange(dead_letter_exchange_name);
    }

    // 声明延迟队列a 延迟10s 并且绑定到对应的死信交换机中
    @bean
    public queue delayqueuea() {
        map<string, object> args = new hashmap<>();
        args.put("x-dead-letter-exchange", dead_letter_exchange_name);
        args.put("x-dead-letter-routing-key", dead_letter_queue_a_routing_key);
        args.put("x-message-ttl", 10000);
        return queuebuilder.durable(delay_queue_a_name).witharguments(args).build();
    }

    // 声明延迟队列a 延迟60s 并且绑定到对应的死信交换机中
    @bean
    public queue delayqueueb() {
        map<string, object> args = new hashmap<>();
        args.put("x-dead-letter-exchange", dead_letter_exchange_name);
        args.put("x-dead-letter-routing-key", dead_letter_queue_b_routing_key);
        args.put("x-message-ttl", 60000);
        return queuebuilder.durable(delay_queue_b_name).witharguments(args).build();
    }

    // 声明延迟队列a的绑定关系
    @bean
    public binding delaybindinga(@qualifier("delayqueuea") queue queue,
                                 @qualifier("delayexchange") directexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(delay_queue_a_routing_key);
    }

    // 声明延迟队列a的绑定关系
    @bean
    public binding delaybindingb(@qualifier("delayqueueb") queue queue,
                                 @qualifier("delayexchange") directexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(delay_queue_b_routing_key);
    }

    // 声明死信队列a,用于接收延迟10s处理的消息
    @bean
    public queue deadletterqueuea() {
        return new queue(dead_letter_queue_a_name);
    }

    // 声明死信队列b,用于接收延迟60s处理的消息
    @bean
    public queue deadletterqueueb() {
        return new queue(dead_letter_queue_b_name);
    }

    // 声明死信队列a的绑定关系
    @bean
    public binding deadletterbindinga(@qualifier("deadletterqueuea") queue queue,
                                      @qualifier("deadletterexchange") directexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(dead_letter_queue_a_routing_key);
    }

    // 声明死信队列b的绑定关系
    @bean
    public binding deadletterbindingb(@qualifier("deadletterqueueb") queue queue,
                                      @qualifier("deadletterexchange") directexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(dead_letter_queue_b_routing_key);
    }
}
  • 枚举类
@getter
@allargsconstructor
public enum delaytypeenum {
    delay_10s(1),
    delay_60s(2);

    private integer type;

    public static delaytypeenum getdelaytypeenum(integer type) {
        if (objects.equals(type, delay_10s.type)) {
            return delay_10s;
        }
        if (objects.equals(type, delay_60s.type)) {
            return delay_60s;
        }
        return null;
    }
}
  • 生产者
@component
public class delaymsgproducer {
    @resource
    private rabbittemplate rabbittemplate;

    public void send(string msg, delaytypeenum type) {
        switch (type) {
            case delay_10s:
                rabbittemplate.convertandsend(delay_exchange_name, delay_queue_a_routing_key, msg);
                break;
            case delay_60s:
                rabbittemplate.convertandsend(delay_exchange_name, delay_queue_b_routing_key, msg);
                break;
            default:
        }
    }
}
  • 消费者
@component
@slf4j
public class deadletterqueueconsumer {
    // 监听死信队列a
    @rabbitlistener(queues = dead_letter_queue_a_name)
    public void receivea(message message) {
        // 获取消息
        string msg = new string(message.getbody());
        // 记录日志
        log.info("当前时间: {}, 死信队列a收到消息: {}", localdatetime.now(), msg);
    }

    // 监听死信队列b
    @rabbitlistener(queues = dead_letter_queue_b_name)
    public void receiveb(message message) {
        // 获取消息
        string msg = new string(message.getbody());
        // 记录日志
        log.info("当前时间: {}, 死信队列b收到消息: {}", localdatetime.now(), msg);
    }
}
  • 测试
    在这里插入图片描述
    在这里插入图片描述

实现方案二

方案一的问题可以通过将ttl设置在消息属性里来解决,然后添加一个延迟队列,用于接收设置为任意延迟时长的消息,再添加一个相应的死信队列和routingkey 即可,如下图:

在这里插入图片描述

  • rabbitmq配置文件
@configuration
public class rabbitmqconfig {
    // 声明 2个交换机  2个队列 2个路由key  属性
    // 延迟交换机
    public static final string delay_exchange_name = "delay.exchange";
    // 延迟队列
    public static final string delay_queue_name = "delay.queue";
    // 延迟队列路由key
    public static final string delay_queue_routing_key = "delay.queue.routingkey";

    // 死信交换机
    public static final string dead_letter_exchange_name = "dead.letter.exchange";
    // 死信队列
    public static final string dead_letter_queue_name = "dead.letter.queue";
    // 死信队列路由key
    public static final string dead_letter_queue_routing_key = "dead.letter.routingkey";


    // 声明延迟交换机
    @bean
    public directexchange delayexchange() {
        return new directexchange(delay_exchange_name);
    }

    // 声明死信交换机
    @bean
    public directexchange deadletterexchange() {
        return new directexchange(dead_letter_exchange_name);
    }

    // 声明延迟队列 并且绑定到对应的死信交换机中
    @bean
    public queue delayqueue() {
        map<string, object> args = new hashmap<>();
        args.put("x-dead-letter-exchange", dead_letter_exchange_name);
        args.put("x-dead-letter-routing-key", dead_letter_queue_routing_key);
        return queuebuilder.durable(delay_queue_name).witharguments(args).build();
    }


    // 声明延迟队列的绑定关系
    @bean
    public binding delaybinding(@qualifier("delayqueue") queue queue,
                                 @qualifier("delayexchange") directexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(delay_queue_routing_key);
    }


    // 声明死信队列
    @bean
    public queue deadletterqueue() {
        return new queue(dead_letter_queue_name);
    }


    // 声明死信队列的绑定关系
    @bean
    public binding deadletterbindinga(@qualifier("deadletterqueue") queue queue,
                                      @qualifier("deadletterexchange") directexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(dead_letter_queue_routing_key);
    }
}
  • 生产者
@component
public class delaymsgproducer {
    @resource
    private rabbittemplate rabbittemplate;

    public void send(string message, string delaytime) {
        rabbittemplate.convertandsend(delay_exchange_name, delay_queue_routing_key, message, msg -> {
            // 设置消息到期时间
            msg.getmessageproperties().setexpiration(delaytime);
            return msg;
        });
    }
}
  • 消费者
@component
@slf4j
public class deadletterqueueconsumer {
    // 监听死信队列
    @rabbitlistener(queues = dead_letter_queue_name)
    public void receivea(message message) {
        // 获取消息
        string msg = new string(message.getbody());
        // 记录日志
        log.info("当前时间: {}, 死信队列收到消息: {}", localdatetime.now(), msg);
    }

}
  • controller
@restcontroller
@slf4j
@requestmapping("/rabbitmq")
public class rabbitmqcontroller {
    @resource
    private delaymsgproducer producer;

    @apioperation(value = "发送消息")
    @getmapping("/send/{msg}/{delaytime}")
    public void send(@pathvariable string msg,
                     @pathvariable string delaytime) {
        log.info("当前时间: {}, 消息: {}, 延迟时间: {}", localdatetime.now(), msg, delaytime);
        producer.send(msg, delaytime);
    }
}
  • 测试
    在这里插入图片描述
    问题:10秒的消息没有被提前释放,发送两条消息,如果后面的消息过期时间比前面的短,则要等着前面消息的过期时间到了再一起被释放

实现方案三(插件)

linux环境docker安装rabbitmq插件

  1. 下载插件
  2. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myrabbit -e rabbitmq_default_vhost=my_vhost -e rabbitmq_default_user=admin -e rabbitmq_default_pass=admin rabbitmq:3.8.5
  3. docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez b40e96969299:/plugins/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
  4. docker exec -it rabbitmq /bin/bash
  5. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

方案二的问题可以通过安装rabbitmq的社区插件rabbitmq_delayed_message_exchange来解决。
安装插件后会生成新的exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列,而是存储在mnesia (一个分布式数据库)中,随后监测消息延迟时间,如达到可投递时间时将其通过x-delayed-type类型标记的交换机投递至目标队列。
在这里插入图片描述
在这里插入图片描述

  • 配置类
@configuration
public class rabbitmqconfig {
    // 声明 1个交换机  1个队列 1个路由key  属性
    // 延迟交换机
    public static final string delay_exchange_name = "delay.exchange";
    // 延迟队列
    public static final string delay_queue_name = "delay.queue";
    // 延迟队列路由key
    public static final string delay_queue_routing_key = "delay.queue.routingkey";


    // 声明延迟交换机
    @bean
    public customexchange delayexchange() {
        map<string, object> args = new hashmap<>();
        args.put("x-delayed-type", "direct");
        return new customexchange(delay_exchange_name, "x-delayed-message", true, false, args);
    }


    // 声明延迟队列
    @bean
    public queue delayqueue() {
        return queuebuilder.durable(delay_queue_name).build();
    }


    // 声明延迟队列的绑定关系
    @bean
    public binding delaybinding(@qualifier("delayqueue") queue queue,
                                 @qualifier("delayexchange") customexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(delay_queue_routing_key).noargs();
    }
}
  • 生产者
@component
public class delaymsgproducer {
    @resource
    private rabbittemplate rabbittemplate;

    public void send(string message, integer delaytime) {
        rabbittemplate.convertandsend(delay_exchange_name, delay_queue_routing_key, message, msg -> {
            // 设置消息到期时间
            msg.getmessageproperties().setdelay(delaytime);
            return msg;
        });
    }
}
  • 消费者
@component
@slf4j
public class deadletterqueueconsumer {
    // 监听死信队列
    @rabbitlistener(queues = rabbitmqconfig.delay_queue_name)
    public void receivea(message message) {
        // 获取消息
        string msg = new string(message.getbody());
        // 记录日志
        log.info("当前时间: {}, 延迟队列收到消息: {}", localdatetime.now(), msg);
    }

}
  • 测试
    -在这里插入图片描述
    没有因为前面一条消息延迟时间长而影响下一条消息

订单超时实战

  • yml
spring:
  rabbitmq:
    host: 192.168.183.139
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost

  datasource:
    driver-class-name: com.mysql.jdbc.driver
    url: jdbc:mysql://127.0.0.1:3306/test?useunicode=true&characterencoding=utf-8&usessl=false
    username: root
    password: 123456

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.stdoutimpl
    map-underscore-to-camel-case: true
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: com.zdz.entity
  global-config:
    db-config:
      id-type: auto

# 自定义配置
order:
  delay:
    time: 60000 # 1分钟 单位毫秒
  • enum
@getter
@allargsconstructor
public enum orderstatus {
    //0待确定 1已确定 2已收获 3已取消 4已完成 5已作废
    no_confirm(0, "待确定"),
    has_confirm(1, "已确定"),
    has_receive(2, "已收获"),
    cancel(3, "已取消 "),
    complete(4, "已完成"),
    discard(5, "已作废");

    private integer status;
    private string message;
}
@getter
@allargsconstructor
public enum paystatus {
    //0 等待支付 1 已支付 2 部分支付
    no_pay(0, "等待支付"),
    has_pay(1, "已支付"),
    part_pay(2, "部分支付");

    private integer status; //状态
    private string message; //描述
}

  • 配置类
@configuration
public class rabbitmqconfig {
    // 声明 1个交换机  1个队列 1个路由key  属性
    // 延迟交换机
    public static final string delay_exchange_name = "delay.exchange";
    // 延迟队列
    public static final string delay_queue_name = "delay.queue";
    // 延迟队列路由key
    public static final string delay_queue_routing_key = "delay.queue.routingkey";


    // 声明延迟交换机
    @bean
    public customexchange delayexchange() {
        map<string, object> args = new hashmap<>();
        args.put("x-delayed-type", "direct");
        return new customexchange(delay_exchange_name, "x-delayed-message", true, false, args);
    }


    // 声明延迟队列
    @bean
    public queue delayqueue() {
        return queuebuilder.durable(delay_queue_name).build();
    }


    // 声明延迟队列的绑定关系
    @bean
    public binding delaybinding(@qualifier("delayqueue") queue queue,
                                 @qualifier("delayexchange") customexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange).with(delay_queue_routing_key).noargs();
    }
}
  • service
@service
public class orderserviceimpl extends serviceimpl<ordermapper, order>
    implements orderservice{
    @resource
    private ordermapper ordermapper;

    @resource
    private delaymsgproducer producer;

    @value("${order.delay.time}")
    private integer orderdelaytime;

    /**
     * 新增订单
     * @param order
     * @return
     */
    @transactional
    @override
    public map<string, object> saveorder(order order) {
        // 订单编号
        order.setordersn(idutil.getsnowflake(1, 1).nextidstr());
        // 订单状态 0 待支付
        order.setorderstatus(orderstatus.no_confirm.getstatus());
        // 支付状态 0 等待支付
        order.setpaystatus(paystatus.no_pay.getstatus());
        // 下单时间
        order.setordertime(new date());
        // 新增订单
        int insert = ordermapper.insert(order);
        map<string, object> map = new hashmap<>();
        if (insert > 0) {
            map.put("code", 200);
            map.put("msg", "订单已提交");
            // 发送消息到队列, 设置消息延迟时间
            producer.send(order.getordersn(), orderdelaytime);
        }else {
            map.put("code", 400);
            map.put("msg", "订单提交失败");
        }
        return map;
    }

    /**
     * 根据用户id查询订单列表
     * @param userid
     * @return
     */
    public list<order> getallbyuserid(integer userid) {
        querywrapper<order> querywrapper = new querywrapper<>();
        querywrapper.eq("user_id", userid);
        return ordermapper.selectlist(querywrapper);
    }
}
  • controller
	@postmapping("/save")
    @apioperation(value = "提交订单")
    public map<string, object> saveorder(@requestbody order order) {
        log.info("订单信息: {}", order);
        return orderservice.saveorder(order);
    }
  • producer
@component
@slf4j
public class delaymsgproducer {
    @resource
    private rabbittemplate rabbittemplate;

    public void send(string message, integer delaytime) {
        log.info("发送订单编号到队列,当前时间: {}, 订单编号: {}", localdatetime.now(), message);
        rabbittemplate.convertandsend(delay_exchange_name, delay_queue_routing_key, message, msg -> {
            // 设置消息到期时间
            msg.getmessageproperties().setdelay(delaytime);
            return msg;
        });
    }
}
  • consumer
@component
@slf4j
public class deadletterqueueconsumer {
    @resource
    private ordermapper ordermapper;
    @resource
    private orderactionmapper orderactionmapper;

    // 监听延迟队列
    @rabbitlistener(queues = rabbitmqconfig.delay_queue_name)
    public void receivea(message message) {
        // 获取消息
        string ordersn = new string(message.getbody());
        // 记录日志
        log.info("当前时间: {}, 订单编号: {}", localdatetime.now(), ordersn);
        // 根据订单编号查询订单
        querywrapper<order> wrapper = new querywrapper<>();
        wrapper.eq("order_sn", ordersn);
        order order = ordermapper.selectone(wrapper);
        log.info("订单信息: {}", order);
        // 如果订单不为空并且支付状态是未支付并且订单状态是待确认
        if (order != null && orderstatus.no_confirm.getstatus().equals(order.getorderstatus())
                && paystatus.no_pay.getstatus().equals(order.getpaystatus())) {
            // 设置订单状态为3 已取消
            order.setorderstatus(orderstatus.cancel.getstatus());
            // 根据订单编写修改订单
            int result = ordermapper.updatebyid(order);
            if (result > 0) {
                orderaction orderaction = new orderaction();
                orderaction.setordersn(ordersn);
                orderaction.setorderstatus(orderstatus.cancel.getstatus());
                orderaction.setpaystatus(paystatus.no_pay.getstatus());
                orderaction.setactionnote("支付超时,订单已取消");
                orderaction.setactiontime(new date());
                orderaction.setstatusdesc("支付超时,订单已取消");
                // 新增订单操作
                orderactionmapper.insert(orderaction);
            }
        }
    }
}
  • 测试
    在这里插入图片描述
(0)

相关文章:

  • Nacos认识和安装

    NacosRule负载均衡策略:优先选择同集群服务实例列表,本地集群找不到提供者,才去其它集群寻找,并且会报警告,确定了可用实例列表后,再采用随机负载均衡挑选实例。Nacos中服务…

    2024年08月06日 游戏开发
  • 数据质量管理的未来趋势:人工智能与大数据

    数据质量管理的未来趋势:人工智能与大数据

    1.背景介绍数据质量管理(Data Quality Management, DQM)是指一系列方法和技术,用于确保数据的准确性、完整性、一致性、时效性和可用性。... [阅读全文]
  • kafka

    数据分片(每个topic会把数据切分为多个partition,每个partition有自己对应的副本保证可靠性,但是副本在leader partition down掉前不会提供服务…

    2024年08月06日 游戏开发
  • AI写作不懂提示词 大象Prompt 保姆级系列教程三

    AI写作不懂提示词 大象Prompt 保姆级系列教程三

    短期内,随着大模型性能增强,提示词的上限也会提高,比如要是 GPT5 出来了,我们之前不提倡的“许愿式”的 Prompt 写法也许就能行得通了,比如你跟 GPT... [阅读全文]
  • 在k8s中部署Kafka高可用集群超详细讲解

    本文详细讲解了如何在Kubernetes(简称K8s)中部署一个高可用的Kafka集群。Kafka作为流处理平台的佼佼者,其在大数据和实时数据流处理中发挥着重要作用。然而,在高负载…

    2024年08月06日 游戏开发
  • 句子如何降重 ai写作

    句子如何降重 ai写作

    通过使用小发猫伪原创工具和快码论文等软件辅助我们进行句子的降重处理可以大大提高我们的工作效率同时我们也要注意保持原意不变符合语法规则避免抄袭等情况的发生未来随着... [阅读全文]

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com