rabbitmq 如何避免消息重复消费?
幂等性
当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。
消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。
如何避免消息重复消费?
消费者端实现幂等性,意味着消息永远不会消费多次,即使收到了多条一样的消息。通常有两种方式来避免消费重复消费:
- 方式1: 消息全局 id 或者写个唯一标识(如时间戳、uuid 等) :每次消费消息之前根据消息 id 去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局 id 作为数据库表的主键,防止重复)
- 方式2: 利用 redis 的 setnx 命令:给消息分配一个全局 id,消费该消息时,先去 redis 中查询有没消费记录,无则以键值对形式写入 redis ,有则不消费该消息。
基于本地消息表实现消息幂等性
导入 pom.xml 依赖(公共部分)
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.7.0</version> <relativepath/> <!-- lookup parent from repository --> </parent> <groupid>com.mq</groupid> <artifactid>rabbitmq</artifactid> <version>0.0.1-snapshot</version> <name>rabbitmq</name> <description>demo project for spring boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --> <!--springboot rabbit 启动器--> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> <version>2.7.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <version>1.18.24</version> <scope>provided</scope> </dependency> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> </dependency> <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --> <dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-all</artifactid> <version>5.8.3</version> </dependency> <dependency> <groupid>com.baomidou</groupid> <artifactid>mybatis-plus-boot-starter</artifactid> <version>3.4.0</version> </dependency> <dependency> <groupid>com.baomidou</groupid> <artifactid>mybatis-plus</artifactid> <version>3.4.0</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> <version>2.6.7</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project>
yml 配置文件(公共部分)
spring: application: name: rabbitmq rabbitmq: host: 10.0.0.4 port: 5672 username: admin password: admin virtual-host: / publisher-confirm-type: correlated # 开启消息发布确认机制 publisher-returns: true # 发布消息返回监听回调 # 指定消息确认模式 listener: simple: acknowledge-mode: manual # 未正确路由的消息发送到备份队列 # 使用备份交换机模式,mandatory 将无效,即就算 mandatory设 置为 false,路由失败的消息同样会被投递到绑定的备份交换机 template: mandatory: true redis: host: 10.0.0.4 database: 0 port: 6379 timeout: 300ms datasource: url: jdbc:mysql://10.0.0.4:3306/test?useunicode=true&characterencoding=utf-8&servertimezone=asia/shanghai username: root password: root server: port: 8080 # mybatis 插件打印日志,打印查询结果 mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.stdoutimpl
创建本地消息表
create table `message_idempotent` ( `message_id` varchar(50) not null comment '消息id', `message_content` varchar(2000) default null comment '消息内容', primary key (`message_id`) ) engine=innodb default charset=utf8;
创建实体类
package com.mq.rabbitmq.vo; import com.baomidou.mybatisplus.extension.activerecord.model; import lombok.allargsconstructor; import lombok.data; import lombok.noargsconstructor; /** * @classname messageidempotent * @description todo * @author 听秋 * @date 2022/6/23 16:25 * @version 1.0 **/ @data @noargsconstructor @allargsconstructor public class messageidempotent extends model<messageidempotent> { private string messageid; private string messagecontent; }
创建 mapper 接口
package com.mq.rabbitmq.mapper; import com.baomidou.mybatisplus.core.mapper.basemapper; import com.mq.rabbitmq.vo.messageidempotent; import org.apache.ibatis.annotations.mapper; /** * @classname messageidempotentmapper * @description todo * @author 听秋 * @date 2022/6/23 19:03 * @version 1.0 **/ @mapper public interface messageidempotentmapper extends basemapper<messageidempotent> { }
创建 rabbitmq 配置类(公共部分)
package com.mq.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import java.util.hashmap; import java.util.map; /** * @classname directconfig * @description todo direct 交换机配置类 * @author 听秋 * @date 2022/6/17 16:30 * @version 1.0 **/ @configuration public class exchangeconfig { /** * 创建 direct 队列 * */ @bean queue directqueue01() { return new queue("directqueue-01",true); } /** * 创建 direct 交换机 * */ @bean directexchange directexchange01() { return new directexchange("directexchange-01"); } /** * 绑定 direct 队列和交换机 * */ @bean binding bindingdirect01() { return bindingbuilder.bind(directqueue01()).to(directexchange01()).with("directrouting01"); } }
自定义消息发送确认的回调(公共部分)
package com.mq.rabbitmq.callback; import org.springframework.amqp.core.returnedmessage; import org.springframework.amqp.rabbit.connection.correlationdata; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import javax.annotation.postconstruct; /** * @classname myconfirmcallback * @description todo 自定义消息发送确认的回调 * @author 听秋 * @date 2022/6/21 19:12 * @version 1.0 **/ @component public class myconfirmcallback implements rabbittemplate.confirmcallback, rabbittemplate.returnscallback { @autowired private rabbittemplate rabbittemplate; /** * postconstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化 */ @postconstruct public void init() { // 指定 confirmcallback rabbittemplate.setconfirmcallback(this); // 指定 returncallback rabbittemplate.setreturnscallback(this); } /** * confirmation callback. * * 确认消息是否成功到达交换机中,不管是否到达交换机,该回调都会执行 * 生产者 → 交换机 */ @override public void confirm(correlationdata correlationdata, boolean ack, string cause) { system.out.println("发布消息的uuid为:" + correlationdata.getid()); if (ack) { system.out.println("消息发布成功!"); } else { system.out.println("消息发布失败!失败原因是:" + cause); } } /** * returned message callback. * * 确认消息是否从交换机成功到达队列中,失败将会执行,成功则不执行 * 交换机 → 队列 */ @override public void returnedmessage(returnedmessage returned) { system.out.println("returnscallback 回调内容:" + returned); } }
创建生产者(公共部分)
package com.mq.rabbitmq.controller; import lombok.extern.slf4j.slf4j; import org.springframework.amqp.core.message; import org.springframework.amqp.core.messagedeliverymode; import org.springframework.amqp.core.messagepostprocessor; import org.springframework.amqp.core.messageproperties; import org.springframework.amqp.rabbit.connection.correlationdata; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.restcontroller; import java.util.uuid; /** * @classname directcontroller * @description todo * @author 听秋 * @date 2022/6/17 16:48 * @version 1.0 **/ @restcontroller @slf4j public class queuecontroller { @autowired private rabbittemplate rabbittemplate; /** * 消息幂等性 * */ @getmapping("/sendmessage") public void sendmessage(string msg, string routingkey, string id) { messageproperties messageproperties = new messageproperties(); messageproperties.setmessageid(id); messageproperties.setcontenttype("text/plain"); messageproperties.setcontentencoding("utf-8"); message message = new message(msg.getbytes(), messageproperties); log.info("生产消息:" + message.tostring()); // 消息发送确认回调 correlationdata correlationdata = new correlationdata(uuid.randomuuid().tostring()); rabbittemplate.convertandsend("directexchange-01", routingkey, message, correlationdata); } }
发送消息
控制台
rabbitmq 队列
创建消费者
package com.mq.rabbitmq.controller; import cn.hutool.core.util.objectutil; import com.baomidou.mybatisplus.core.conditions.query.querywrapper; import com.mq.rabbitmq.vo.messageidempotent; import com.rabbitmq.client.channel; import lombok.extern.slf4j.slf4j; import org.springframework.amqp.core.message; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.stringredistemplate; import org.springframework.stereotype.component; import java.io.ioexception; import java.nio.charset.standardcharsets; /** * @classname directreceiver * @description todo * @author 听秋 * @date 2022/6/21 10:40 * @version 1.0 **/ @component @slf4j public class consumer { @autowired private stringredistemplate stringredistemplate; /** * 基于本地消息表实现消息幂等性 * @param message */ @rabbitlistener(queues = "directqueue-01") public void receivemessage02(message message, channel channel) throws ioexception { string messageid = message.getmessageproperties().getmessageid(); string messagecontent = new string(message.getbody(), standardcharsets.utf_8); messageidempotent messageidempotent = new messageidempotent(); querywrapper<messageidempotent> querywrapper = new querywrapper<>(); querywrapper.eq("message_id", messageid); messageidempotent msg = messageidempotent.selectone(querywrapper); if (objectutil.isnull(msg)) { messageidempotent.setmessageid(messageid); messageidempotent.setmessagecontent(messagecontent); messageidempotent.insert(); log.info("directqueue-01-消费者收到消息,消息id:" + messageid + " 消息内容:" + messagecontent); // 消息确认 channel.basicack(message.getmessageproperties().getdeliverytag(), false); } else { log.info("消息 " + messageid + " 已经消费过!"); } }
消费消息
控制台
rabbitmq 队列
数据库
基于 redis 实现消息幂等性
发送消息
控制台
rabbitmq 队列
创建消费者
package com.mq.rabbitmq.controller; import cn.hutool.core.util.objectutil; import com.baomidou.mybatisplus.core.conditions.query.querywrapper; import com.mq.rabbitmq.vo.messageidempotent; import com.rabbitmq.client.channel; import lombok.extern.slf4j.slf4j; import org.springframework.amqp.core.message; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.stringredistemplate; import org.springframework.stereotype.component; import java.io.ioexception; import java.nio.charset.standardcharsets; /** * @classname directreceiver * @description todo * @author 听秋 * @date 2022/6/21 10:40 * @version 1.0 **/ @component @slf4j public class consumer { @autowired private stringredistemplate stringredistemplate; /** * 基于 redis 实现消息幂等性 * @param message */ //@rabbithandler //@rabbitlistener(queues = "directqueue-01") public void receivemessage01(message message, channel channel) throws ioexception { string messageid = message.getmessageproperties().getmessageid(); string messagecontent = new string(message.getbody(), standardcharsets.utf_8); // 消息不存在则创建,返回 true boolean exist = stringredistemplate.opsforvalue().setifabsent(messageid, messagecontent); if (!exist) { log.info("消息 " + messageid + " 已经消费过"); } else { // 消息确认 channel.basicack(message.getmessageproperties().getdeliverytag(), false); log.info("directqueue-01-消费者收到消息,消息id:" + messageid + " 消息内容:" + messagecontent); } } }
消费消息
控制台
rabbitmq 队列
redis
到此这篇关于rabbitmq 如何避免消息重复消费的文章就介绍到这了,更多相关rabbitmq 消息重复消费内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论