当前位置: 代码网 > it编程>数据库>Redis > Redis从基础到Stream消息队列实战指南

Redis从基础到Stream消息队列实战指南

2026年03月23日 Redis 我要评论
一、redis 简介redis(remote dictionary server)是一个基于内存的键值存储系统,支持多种数据结构,常被用作缓存、消息队列和会话存储。它提供高性能读写,并支持持久化、主从

一、redis 简介

redis(remote dictionary server)是一个基于内存的键值存储系统,支持多种数据结构,常被用作缓存、消息队列和会话存储。它提供高性能读写,并支持持久化、主从复制和集群。

二、redis 核心特点

特点说明
内存存储数据主要在内存中,读写延迟低(通常微秒级)
持久化支持 rdb 快照和 aof 日志,保证数据不丢失
多种数据结构string、list、hash、set、sorted set、stream、bitmap 等
单线程模型命令串行执行,避免锁竞争,保证原子性
主从复制支持读写分离和高可用
发布订阅支持 pub/sub 和 stream 消息队列
事务支持 multi/exec 事务
lua 脚本支持 lua 脚本,保证原子性
集群支持 redis cluster 水平扩展

三、redis 常用数据结构

3.1 string(字符串)

set key value          # 设置
get key                # 获取
incr key               # 自增
expire key seconds     # 设置过期时间

3.2 hash(哈希)

hset user:1 name "张三" age 25
hget user:1 name
hgetall user:1

3.3 list(列表)

lpush queue task1      # 左侧入队
rpop queue             # 右侧出队
lrange queue 0 -1      # 范围查询

3.4 set(集合)

sadd tags "redis" "cache"
smembers tags
sismember tags "redis"

3.5 sorted set(有序集合)

zadd rank 100 "user1" 95 "user2"
zrange rank 0 -1 withscores
zrevrank rank "user1"

四、redis stream:消息队列

redis 5.0 引入 stream,用于实现消息队列,支持:

  • 消息持久化
  • 消费者组
  • 消息确认(ack)
  • 历史消息回溯

4.1 基本命令

# 添加消息
xadd mystream * field1 value1 field2 value2
# 读取消息(从头)
xread count 10 streams mystream 0
# 读取最新消息(阻塞)
xread block 5000 streams mystream $

4.2 消费者组

# 创建消费者组
xgroup create mystream mygroup 0 mkstream
# 消费消息
xreadgroup group mygroup consumer1 count 1 streams mystream >
# 确认消息
xack mystream mygroup message-id
# 查看待确认消息
xpending mystream mygroup

4.3 stream 与 list 的对比

特性liststream
消息持久化
消费者组
消息确认
历史回溯有限支持
适用场景简单队列可靠消息队列

五、redis 在项目中的典型用法

以银行场景下的 sop 合规检测为例,redis stream 用于任务分发和结果回传。

5.1 架构示意

后端/前端 → xadd 推送任务 → redis stream
                              ↓
                    sop_engine 监听 xreadgroup
                              ↓
                    处理完成后 xadd 推送结果
                              ↓
                    后端消费 message:xxx:results

5.2 maf 营销分析任务流

# 1. 推送 maf 任务
xadd message:maf:tasks * task_id "maf_001" \
  conversation_id "maf_001" \
  conversation_json_path "bucket/path/conversation.json"
# 2. 引擎监听 message:maf:tasks,消费并分析
# 3. 引擎完成后写入结果流
xadd message:maf:results * task_id "maf_001" status "completed" \
  result_path "maf-results-bucket/maf_001/analysis_result.json"

5.3 消费者组带来的好处

  • 多个 worker 共同消费,实现负载均衡
  • 每条消息只被组内一个消费者处理
  • 支持 ack,失败可重试
  • 支持 pel(pending entries list)查看未确认消息

5.4 redis stream 请求分发规则

如果两个项目使用了同一套redis配置,那么无法保证redis stream 请求会由谁处理,取决于 redis 消费者组的分发。

两个容器都连同一个 redis,监听同一个 stream(如 message:maf:tasks),且通常在同一消费者组(如 maf_analyze_group)里:

  • redis 会把消息分发给组内的消费者
  • 每条消息只会被组内一个消费者处理
  • 具体是容器 1 还是容器 2,由 redis 的分发策略决定,无法指定

因此:

  • 有时是a容器处理
  • 有时是b容器处理
  • 测试时无法稳定地“只让某个容器”处理请求

可能带来的问题:

场景说明
代码版本混用同一批任务,部分由旧代码处理,部分由新代码处理
测试不可控无法保证测试请求一定打到新容器
任务重复若消费者组配置不当,可能出现同一条消息被多个消费者处理

如何保证只使用新代码:

方案 1:停掉旧容器

# 停掉旧容器后再部署新容器
docker stop <old_container_id>
docker run ...  # 启动新容器

方案 2:用不同的 stream 做测试

  • 新容器监听不同的 stream,例如 message:maf:tasks_test
  • 测试时往 message:maf:tasks_test 发消息
  • 需要改配置或环境变量,让新容器使用 message:maf:tasks_test

方案 3:用不同的消费者组

  • 新容器使用不同的消费者组名,例如 maf_analyze_group_v2
  • 两个组都会收到同一条消息,各自处理一次
  • 适合做 a/b 或灰度,但会产生重复处理,需要业务上能接受

方案 4:只保留一个容器

  • 部署新容器前先停掉旧容器
  • 或使用滚动更新,保证同一时间只有一个版本在跑

六、redis 常用配置

# 绑定地址
bind 0.0.0.0
# 端口
port 6379
# 密码
requirepass your_password
# 最大内存
maxmemory 2gb
maxmemory-policy allkeys-lru
# 持久化
save 900 1
save 300 10
save 60 10000
appendonly yes

七、python 操作 redis

7.1 linux安装

# ubuntu/debian
sudo apt update
sudo apt install redis-server
# centos/rhel
sudo yum install redis
# 启动服务
sudo systemctl start redis
sudo systemctl enable redis
# 验证
redis-cli ping
# 返回 pong 表示成功

7.2 docker安装

docker run -d --name redis -p 6379:6379 redis:latest
# 带密码
docker run -d --name redis -p 6379:6379 redis redis-server --requirepass yourpassword

7.3 连接redis

# 本地连接
redis-cli
# 带密码连接
redis-cli -a yourpassword
# 远程连接
redis-cli -h host -p 6379 -a password

7.4 基本使用

import redis
r = redis.redis(host='localhost', port=6379, db=0, password='')
# string
r.set('name', 'redis')
print(r.get('name'))
# stream 添加消息
r.xadd('mystream', {'task_id': '001', 'data': 'hello'})
# stream 消费
messages = r.xreadgroup('mygroup', 'consumer1', {'mystream': '>'}, count=1)
for stream, msgs in messages:
    for msg_id, data in msgs:
        print(msg_id, data)
        r.xack('mystream', 'mygroup', msg_id)

八、redis 使用建议

  1. 合理设置 maxmemory 和淘汰策略,避免 oom。
  2. 生产环境开启 requirepass 和访问控制。
  3. 使用连接池,减少连接开销。
  4. 对热点 key 做拆分或本地缓存,减轻压力。
  5. 使用 pipeline 批量执行命令,降低网络往返。
  6. stream 场景下注意消费者组和 ack,避免消息堆积和重复消费。

总结

redis 适合做缓存、会话存储和消息队列。stream 在需要可靠消费、负载均衡和消息确认的场景中,比简单 list 更合适。结合具体业务(如 maf 任务流),可以设计出清晰、可扩展的异步处理架构。

到此这篇关于redis从基础到stream消息队列实战指南的文章就介绍到这了,更多相关redis stream 消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com