前言
在电商秒杀场景中,瞬间爆发的海量请求往往成为系统的生死考验。当并发量达到数万甚至数十万qps时,传统数据库单表架构难以支撑,而redis与消息队列(mq)的组合凭借其高性能与可靠性,成为应对高并发秒杀的黄金方案。
方案总览
用户请求 → 前端生成token → redis执行lua脚本(预扣减+防重+流水)→ 发送rocketmq事务消息 →
[本地事务校验redis结果] → mq消息确认(commit/rollback)→ 消费者消费消息 → mysql扣减库存+记录订单
秒杀系统的核心诉求是抗并发、防超卖、保一致。redis+mq 方案通过 “前端拦截 - 中间缓冲 - 后端落地” 的三层架构实现这一目标:
- 前端拦截:
redis通过lua脚本原子性处理库存预扣减,过滤无效请求; - 中间缓冲:
mq(如rocketmq)通过事务消息削峰填谷,确保流量平稳进入数据库; - 后端落地:
mysql最终存储库存与订单数据,通过事务消息保障与redis的一致性。
流程拆解(示例代码)
redis 库存预扣减
预扣减流程
开始
│
├─ 生成token(前端)
│
├─ 前端携带token请求秒杀
│
├─ 执行lua脚本
│ │
│ ├─ 检查token是否存在(hash结构)
│ │ ├─ 存在 → 返回“重复提交”
│ │ └─ 不存在 → 继续
│ │
│ ├─ 获取redis库存(string结构)
│ │ ├─ 库存不足 → 返回“库存不足”
│ │ └─ 库存充足 → 继续
│ │
│ ├─ 扣减redis库存并更新
│ │
│ └─ 记录流水到hash结构
│
├─ 返回扣减结果(成功/失败)
│
结束
lua 脚本
-- 启用redis命令复制,确保脚本在集群环境中正确同步
redis.replicate_commands()
-- 1. 防重提交校验:通过用户id+token判断是否重复提交
-- keys[2]为用户id(uid),argv[2]为本次请求的token
if redis.call('hexists', keys[2], argv[2]) == 1 then
return redis.error_reply('repeat submit') -- 重复提交,返回错误
end
-- 2. 库存充足性校验
local product_id = keys[1] -- 商品id
local stock = redis.call('get', keys[1]) -- 获取当前库存
if not stock then -- 库存不存在(如商品未上架)
return redis.error_reply('product not found')
end
if tonumber(stock) < tonumber(argv[1]) then -- 库存不足
return redis.error_reply('stock is not enough')
end
-- 3. 执行库存扣减
local remaining_stock = tonumber(stock) - tonumber(argv[1])
redis.call('set', keys[1], tostring(remaining_stock)) -- 更新库存
-- 4. 记录交易流水(用于后续一致性校验)
local time = redis.call('time') -- 获取当前时间(秒+微秒)
local currenttimemillis = (time[1] * 1000) + math.floor(time[2] / 1000) -- 转换为毫秒时间戳
-- 存储流水到hash结构:用户id → token → 流水详情
redis.call('hset', keys[2], argv[2],
cjson.encode({
action = '扣减库存',
product = product_id,
from = stock, -- 扣减前库存
to = remaining_stock, -- 扣减后库存
change = argv[1], -- 扣减数量
token = argv[2],
timestamp = currenttimemillis
})
)
return remaining_stock -- 返回扣减后库存java 调用 lua
@service
public class seckillservice {
@autowired
private stringredistemplate redistemplate;
// 加载lua脚本
private defaultredisscript<long> stockscript;
@postconstruct
public void init() {
stockscript = new defaultredisscript<>();
stockscript.setscriptsource(new resourcescriptsource(new classpathresource("seckill.lua")));
stockscript.setresulttype(long.class);
}
/**
* 执行redis库存预扣减
* @param productid 商品id
* @param uid 用户id
* @param quantity 购买数量
* @param token 防重token
* @return 扣减后库存(-1表示失败)
*/
public long predeductstock(string productid, string uid, integer quantity, string token) {
try {
// 执行lua脚本:keys = [商品id, 用户id],argv = [数量, token]
return redistemplate.execute(
stockscript,
arrays.aslist(productid, uid),
quantity.tostring(),
token
);
} catch (exception e) {
log.error("redis预扣减失败", e);
return -1l;
}
}
}mysql 库存扣减
扣减流程图
开始
│
├─ 发送半消息到rocketmq
│
├─ 执行本地事务
│ │
│ ├─ 检查redis流水是否存在
│ │ ├─ 存在 → 提交消息(commit)
│ │ └─ 不存在 → 回滚消息(rollback)
│ │
│ └─ 未知状态 → 等待回查
│
├─ rocketmq回查机制
│ ├─ 有流水 → 提交消息
│ └─ 无流水 → 回滚消息
│
├─ 消息被消费
│ │
│ ├─ 查询数据库当前版本号(乐观锁)
│ │
│ ├─ 执行库存扣减(where version = 当前版本)
│ │ ├─ 扣减成功 → 记录数据库流水
│ │ └─ 扣减失败 → 抛出异常(触发重试)
│ │
├─ 结束
发送半消息
系统首先向rocketmq发送一条半消息(half message)。此时消息处于不可消费状态,需等待生产者确认本地事务执行结果后,才会被消费者处理。
// 发送半消息
public void sendhalfmessage(string productid, string uid, string token, integer quantity) {
// 构建消息
message message = new message(
"seckill_topic", // 主题
"stock_deduct", // 标签
json.tojsonstring(new seckillmessage(productid, uid, token, quantity)).getbytes()
);
// 发送事务消息
transactionsendresult result = rocketmqtemplate.sendmessageintransaction(
"seckill_producer_group", // 生产者组
message,
null // 本地事务参数(可传递上下文)
);
log.info("半消息发送结果:{}", result.getsendstatus());
}本地事务校验
本地事务的核心是判断redis预扣减是否成功:
- 若
redis的lua脚本执行成功(即库存预扣减完成且流水已记录),则向rocketmq返回 提交(commit)指令,消息变为可消费状态; - 若
redis预扣减失败(如库存不足或重复提交),则返回回滚(rollback)指令,消息被丢弃。 - 若
rocketmq长时间未收到本地事务结果(如生产者宕机),会触发消息回查。此时系统通过检查redis中是否存在对应交易流水,判断是否需要提交消息:若流水存在,则提交;否则回滚。
@component
public class seckilltransactionlistener implements transactionlistener {
@autowired
private stringredistemplate redistemplate;
// 执行本地事务
@override
public localtransactionstate executelocaltransaction(message msg, object arg) {
try {
seckillmessage message = json.parseobject(new string(msg.getbody()), seckillmessage.class);
// 检查redis中是否存在对应流水(验证预扣减成功)
boolean flag = redistemplate.opsforhash().haskey(
message.getuid(), // hash key:用户id
message.gettoken() // hash field:token
);
return flag ? rocketmqlocaltransactionstate.commit : rocketmqlocaltransactionstate.rollback;
} catch (exception e) {
return rocketmqlocaltransactionstate.unknown; // 未知状态,触发回查
}
}
// 消息回查(解决超时未确认问题)
@override
public localtransactionstate checklocaltransaction(messageext msg) {
seckillmessage message = json.parseobject(new string(msg.getbody()), seckillmessage.class);
// 回查逻辑:再次检查流水是否存在
boolean flag = redistemplate.opsforhash().haskey(message.getuid(), message.gettoken());
return flag ? rocketmqlocaltransactionstate.commit : rocketmqlocaltransactionstate.rollback;
}
}消费消息并扣减 mysql 库存
消费者监听消息,执行数据库扣减(需保证幂等性): 消费者接收到可消费的消息后,执行mysql库存扣减操作,并同步记录数据库中的交易流水。为确保消费成功,需利用mq的重试机制:若消费失败(如数据库暂时不可用),mq会自动重试,直至消费成功或达到最大重试次数(此时需人工介入处理)。
@component
@rocketmqmessagelistener(
topic = "seckill_topic",
consumergroup = "seckill_consumer_group",
messagemodel = messagemodel.clustering
)
public class seckillconsumer implements rocketmqlistener<messageext> {
@autowired
private jdbctemplate jdbctemplate;
@override
public void onmessage(messageext message) {
seckillmessage msg = json.parseobject(new string(message.getbody()), seckillmessage.class);
string productid = msg.getproductid();
int quantity = msg.getquantity();
// 数据库扣减(使用乐观锁防超卖)
string sql = "update product_stock " +
"set stock = stock - ?, version = version + 1 " +
"where product_id = ? and stock >= ? and version = ?";
// 1. 查询当前版本号
integer version = jdbctemplate.queryforobject(
"select version from product_stock where product_id = ?",
integer.class,
productid
);
// 2. 执行扣减(乐观锁保证原子性)
int rows = jdbctemplate.update(sql, quantity, productid, quantity, version);
if (rows > 0) {
// 扣减成功:记录数据库流水
jdbctemplate.update(
"insert into stock_flow (product_id, quantity, op_type, create_time) " +
"values (?, ?, 'seckill', now())",
productid, quantity
);
// 确认消费成功(返回ack)
} else {
// 扣减失败:触发重试(mq默认重试机制)
throw new runtimeexception("数据库扣减失败,触发重试");
}
}
}一致性保障
为防止redis与mysql数据不一致(如redis扣减成功但mysql扣减失败),需定期对账:
@scheduled(cron = "0 0 */1 * * ?") // 每小时执行一次
public void reconcilestock() {
// 1. 扫描redis中未同步到mysql的流水
set<string> uids = redistemplate.keys("uid:*"); // 假设用户id前缀为uid:
for (string uid : uids) {
map<object, object> tokenmap = redistemplate.opsforhash().entries(uid);
for (map.entry<object, object> entry : tokenmap.entryset()) {
string token = (string) entry.getkey();
string flowjson = (string) entry.getvalue();
seckillflow flow = json.parseobject(flowjson, seckillflow.class);
// 2. 检查mysql是否有对应订单
integer count = jdbctemplate.queryforobject(
"select count(1) from orders where product_id = ? and uid = ? and token = ?",
integer.class,
flow.getproduct(), flow.getuid(), token
);
if (count == 0) {
// 3. 未找到订单 → 人工介入或自动回滚redis库存
log.warn("发现不一致:redis有流水但mysql无订单,product={}, uid={}", flow.getproduct(), uid);
// redistemplate.opsforvalue().increment(flow.getproduct(), integer.parseint(flow.getchange()));
}
}
}
}系统可通过定时任务对比redis流水、mysql库存流水与订单表数据:若redis流水存在但订单表无对应记录,说明订单生成失败,需人工介入补单或回滚redis库存,避免少卖;若订单表有记录但mysql库存未扣减,则需触发库存补扣,避免多卖。
总结
redis + mq 方案通过预扣减 + 事务消息 + 对账三重机制,完美解决了高并发秒杀的核心痛点:
redis承担高并发读写,通过lua脚本确保原子性,防止超卖;mq事务消息保障redis与mysql的最终一致性,避免数据断层;- 流水对账作为最后一道防线,及时发现并修复异常。
到此这篇关于redis+mq高并发秒杀的技术方案与实现的文章就介绍到这了,更多相关redis+mq高并发秒杀内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论