前言
当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。
循环场景
生产者发送100条消息到rabbitmq中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为false时,设置了死信队列则进入死信队列,否则移除消息)。
consumer.received += (model, ea) => { var message = ea.body; console.writeline("接收到信息为:" + encoding.utf8.getstring(message.toarray())); if (encoding.utf8.getstring(message.toarray()).contains("50")) { console.writeline("拒收"); ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: true); return; } ((eventingbasicconsumer)model).model.basicack(ea.deliverytag, multiple: false); };
当第50条消息拒收,则仍在队列中且处在队列头部,重新推送给消费者,再次拒收,再次推送,反反复复。
最终其他消息全部消费完毕,仅剩第50条消息往复间不断消费,拒收,消费,这将可能导致rabbitmq出现内存泄漏问题。
解决方案
rabbitmq及amqp协议本身没有提供这类重试功能,但可以利用一些已有的功能来间接实现重试限定(以下只考虑基于手动确认模式情况)。此处只想到或是只查到了如下几种方案解决消息循环消费问题。
- 一次消费
- 无论成功与否,消费者都对外返回ack,将拒收原因或是异常信息catch存入本地或是新队列中另作重试。
- 消费者拒绝消息或是出现异常,返回nack或reject,消息进入死信队列或丢弃(requeue设定为false)。
- 限定重试次数
- 在消息的头中添加重试次数,并将消息重新发送出去,再每次重新消费时从头中判断重试次数,递增或递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
- 可以在redis、memcache或其他存储中存储消息唯一键(例如guid、雪花id等,但必须在发布消息时手动设置它),甚至在mysql中连同重试次数一起存储,然后在每次重新消费时递增/递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
- 队列使用quorum类型,限制投递次数,超过次数消息被删除。
- 队列消息过期
- 设置过期时间,给队列或是消息设置ttl,重试一定次数消息达到过期时间后进入死信队列或丢弃(requeue设定为true)。
- 也许还有更多好的方案...
一次消费
对外总是ack
消息到达了消费端,可因某些原因消费失败了,对外可以发送ack,而在内部走额外的方式去执行补偿操作,比如将消息转发到内部的rabbitmq或是其他处理方式,终归是只消费一次。
var queuename = "alwaysack_queue"; channel.queuedeclare(queue: queuename, durable: false, exclusive: false, autodelete: false, arguments: null); channel.basicqos(0, 5, false); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { try { var message = ea.body; console.writeline("接收到信息为:" + encoding.utf8.getstring(message.toarray())); if (encoding.utf8.getstring(message.toarray()).contains("50")) { throw new exception("模拟异常"); } } catch (exception ex) { console.writeline(ex.message); } finally { ((eventingbasicconsumer)model).model.basicack(ea.deliverytag, multiple: false); } }; channel.basicconsume(queue: queuename, autoack: false, consumer: consumer);
当消费端收到消息,处理时出现异常,可以另想办法去处理,而对外保持着ack的返回,以避免消息的循环消费。
消息不重入队列
在消费者端,因异常或是拒收消息时,对requeue设置为false时,如果设置了死信队列,则符合“消息被拒绝且不重入队列”这一进入死信队列的情况,从而避免消息反复重试。如未设置死信队列,则消息被丢失。
此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false。
var dlxexchangename = "dlx_exchange"; channel.exchangedeclare(exchange: dlxexchangename, type: "fanout", durable: false, autodelete: false, arguments: null); var dlxqueuename = "dlx_queue"; channel.queuedeclare(queue: dlxqueuename, durable: false, exclusive: false, autodelete: false, arguments: null); channel.queuebind(queue: dlxqueuename, exchange: dlxexchangename, routingkey: ""); var queuename = "nackorreject_queue"; var arguments = new dictionary<string, object> { { "x-dead-letter-exchange", dlxexchangename } }; channel.queuedeclare(queue: queuename, durable: false, exclusive: false, autodelete: false, arguments: arguments); channel.basicqos(0, 5, false); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = ea.body; console.writeline("接收到信息为:" + encoding.utf8.getstring(message.toarray())); if (encoding.utf8.getstring(message.toarray()).contains("50")) { console.writeline("拒收"); ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: false);//关键在于requeue=false return; } ((eventingbasicconsumer)model).model.basicack(ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queuename, autoack: false, consumer: consumer);
如此一来,拒收消息不会重入队列,并且现有队列绑定了死信交换机,因此,消息进入到死信队列中,如不绑定,则消息丢失。
限定重试次数
设置重试次数,限定循环消费的次数,允许短暂的循环,但最终打破循环。
消息头设定次数
在消息头中设置次数记录作为标记,但是,消费端无法对接收到的消息修改消息头然后将原消息送回mq,因此,需要将原消息内容重新发送消息到mq,具体步骤如下
- 原消息设置不重入队列。
- 再发送新的消息其内容与原消息一致,可设置新消息的消息头来携带重试次数。
- 消费端再次消费时,便可从消息头中查看消息被消费的次数。
此处假定接收10条消息,在接收到第5条消息时设置拒收, 当消息头中重试次数未超过设定的3次时,消息可以重入队列,再次被消费。
var queuename = "messageheaderretrycount_queue"; channel.queuedeclare(queue: queuename, durable: false, exclusive: false, autodelete: false, arguments: null); channel.basicqos(0, 5, false); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = ea.body; console.writeline("接收到信息为:" + encoding.utf8.getstring(message.toarray())); if (encoding.utf8.getstring(message.toarray()).contains("5")) { var maxretrycount = 3; console.writeline($"拒收 {datetime.now}"); //初次消费 if (ea.basicproperties.headers == null) { //原消息设置为不重入队列 ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: false); //发送新消息到队列中 retrypublishmessage(channel, queuename, message.toarray(), 1); return; } //获取重试次数 var retrycount = parseretrycount(ea); if (retrycount < maxretrycount) { //原消息设置为不重入队列 ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: false); //发送新消息到队列中 retrypublishmessage(channel, queuename, message.toarray(), retrycount + 1); return; } //到达最大次数,不再重试消息 ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: false); return; } ((eventingbasicconsumer)model).model.basicack(ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queuename, autoack: false, consumer: consumer); static void retrypublishmessage(imodel channel, string queuename, byte[] body, int retrycount) { var basicproperties = channel.createbasicproperties(); basicproperties.headers = new dictionary<string, object>(); basicproperties.headers.add("retrycount", retrycount); channel.basicpublish(exchange: "", routingkey: queuename, basicproperties: basicproperties, body: body); } static int parseretrycount(basicdelivereventargs ea) { var existretryrecord = ea.basicproperties.headers.trygetvalue("retrycount", out object retrycount); if (!existretryrecord) { throw new exception("没有设置重试次数"); } return (int)retrycount; }
消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中,如此来控制消费次数,但是这种场景下,新消息排在了队列的尾部,而不是原消息排在队列头部。
存储重试次数
在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在。
与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了。需要注意的是,消息发送端需要设置消息的唯一标识(messageid属性)
//模拟外部存储服务 var messageretrycounts = new dictionary<ulong, int>(); var queuename = "storageretrycount_queue"; channel.queuedeclare(queue: queuename, durable: false, exclusive: false, autodelete: false, arguments: null); channel.basicqos(0, 5, false); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = ea.body; console.writeline("接收到信息为:" + encoding.utf8.getstring(message.toarray())); if (encoding.utf8.getstring(message.toarray()).contains("50")) { var maxretrycount = 3; console.writeline("拒收"); //重试次数判断 var existretryrecord = messageretrycounts.containskey(ea.basicproperties.messageid); if (!existretryrecord) { //重入队列,继续重试 messageretrycounts.add(ea.basicproperties.messageid, 1); ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: true); return; } if (messageretrycounts[ea.basicproperties.messageid] < maxretrycount) { //重入队列,继续重试 messageretrycounts[ea.basicproperties.messageid] = messageretrycounts[ea.basicproperties.messageid] + 1; ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: true); return; } //到达最大次数,不再重试消息 ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: false); return; } ((eventingbasicconsumer)model).model.basicack(ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queuename, autoack: false, consumer: consumer);
除第一次拒收外,允许三次重试机会,三次重试完毕后,设置requeue为false,消息丢失或进入死信队列(如有设置的话)。
队列使用quorum类型
第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用quorum,由mq来限定消息的投递次数,也就控制了重试次数。
设置队列类型为quorum,设置投递最大次数,当超过投递次数后,消息被丢弃。
var queuename = "quorumtype_queue"; var arguments = new dictionary<string, object>() { { "x-queue-type", "quorum"}, { "x-delivery-limit", 3 } }; channel.queuedeclare(queue: queuename, durable: true, exclusive: false, autodelete: false, arguments: arguments); channel.basicqos(0, 5, false); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = ea.body; console.writeline("接收到信息为:" + encoding.utf8.getstring(message.toarray())); if (encoding.utf8.getstring(message.toarray()).contains("50")) { console.writeline($"拒收 {datetime.now}"); ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: true); return; } ((eventingbasicconsumer)model).model.basicack(ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queuename, autoack: false, consumer: consumer);
第一次消费被拒收重入队列后,经最大三次投递后,消费端不再收到消息,如此一来也限制了消息的循环消费。
队列消息过期
当为消息设置了过期时间时,当消息没有受到ack,且还在队列中,受到过期时间的限制,反复消费但未能成功时,消息将走向过期,进入死信队列或是被丢弃。
聚焦于过期时间的限制,因此在消费者端,因异常或是拒收消息时,需要对requeue设置为true,将消息再次重入到原队列中。
设定消费者端第五十条消息会被拒收,且队列的ttl设置为5秒。
//死信交换机和死信队列 var dlxexchangename = "dlx_exchange"; channel.exchangedeclare(exchange: dlxexchangename, type: "fanout", durable: false, autodelete: false, arguments: null); var dlxqueuename = "dlx_queue"; channel.queuedeclare(queue: dlxqueuename, durable: false, exclusive: false, autodelete: false, arguments: null); channel.queuebind(queue: dlxqueuename, exchange: dlxexchangename, routingkey: ""); //常规队列 var queuename = "normalmessage_queue"; var arguments = new dictionary<string, object> { { "x-message-ttl", 5000}, { "x-dead-letter-exchange", dlxexchangename } }; channel.queuedeclare(queue: queuename, durable: false, exclusive: false, autodelete: false, arguments: arguments); channel.basicqos(0, 5, false); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = ea.body; console.writeline("接收到信息为:" + encoding.utf8.getstring(message.toarray())); if (encoding.utf8.getstring(message.toarray()).contains("50")) { console.writeline($"拒收 {datetime.now}"); ((eventingbasicconsumer)model).model.basicreject(ea.deliverytag, requeue: true); return; } ((eventingbasicconsumer)model).model.basicack(ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queuename, autoack: false, consumer: consumer);
当消费者端拒收消息后消息重入队列,再次消费,反复进行超过5秒后,消息在队列中达到了过期时间,则被挪入到死信队列中。
从web管理中死信队列中可查看该条过期的消息。
参考资料
https://www.jianshu.com/p/f77a0b10c140
https://www.jianshu.com/p/4904c609632f
https://stackoverflow.com/questions/23158310/how-do-i-set-a-number-of-retry-attempts-in-rabbitmq
到此这篇关于.net core和rabbitmq限制循环消费的文章就介绍到这了,更多相关.net core rabbitmq循环消费内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论