当前位置: 代码网 > it编程>前端脚本>Python > python操作redis基础

python操作redis基础

2025年05月27日 Python 我要评论
这是一份详细的 python 操作 redis 教程,涵盖了从安装、连接到操作各种 redis 数据类型以及一些进阶用法。(本人量化交易的数据库主要是redis+本地文件parquet结合)1. re

这是一份详细的 python 操作 redis 教程,涵盖了从安装、连接到操作各种 redis 数据类型以及一些进阶用法。(本人量化交易的数据库主要是redis+本地文件parquet结合)

1. redis 简介

redis (remote dictionary server) 是一个开源的、基于内存的键值对(key-value)存储系统。它通常用作数据库、缓存和消息代理。由于数据存储在内存中,redis 的读写速度非常快,适用于需要高性能数据访问的场景。

支持的数据类型:

  • string (字符串): 最基本的数据类型,可以是任何数据,如文本、json、序列化对象等(最大 512mb)。
  • list (列表): 字符串列表,按插入顺序排序。可以从两端进行 push/pop 操作。
  • set (集合): 无序的字符串集合,不允许重复成员。
  • hash (哈希/散列): 包含键值对的集合,非常适合存储对象。
  • sorted set (有序集合): 类似 set,但每个成员都关联一个 double 类型的分数(score),并根据分数进行排序。成员必须唯一,但分数可以重复。
  • 以及其他更高级的类型如 streams, bitmaps, hyperloglogs 等。

2. 前提条件

  • 安装 python: 确保你的系统已安装 python (建议 3.6+)。
  • 安装 redis 服务器: 你需要在你的机器上或可访问的网络上运行一个 redis 服务器实例。
    • linux/macos: 可以通过包管理器安装(如 apt install redis-server, brew install redis)。
    • windows: 可以通过 wsl (windows subsystem for linux) 安装,或者下载官方不推荐但可用的 windows 版本,或者使用 docker。
    • docker: 推荐方式,跨平台且易于管理:docker run --name my-redis -p 6379:6379 -d redis
  • 启动 redis 服务器: 确保 redis 服务正在运行。默认监听端口是 6379

你可以使用 redis-cli ping 命令测试服务器是否响应(应返回 pong)。

3. 安装 python redis 客户端库

与 redis 服务器交互最常用的 python 库是 redis-py

pip install redis

4. 连接到 redis

方法一:直接连接

这是最简单的方式,适用于快速测试或简单脚本。

import redis
try:
    # 连接到本地默认端口的 redis
    # decode_responses=true: 让 get() 等方法返回字符串而不是 bytes
    r = redis.redis(host='localhost', port=6379, db=0, decode_responses=true)
    # 如果 redis 设置了密码
    # r = redis.redis(host='your_redis_host', port=6379, db=0, password='your_password', decode_responses=true)
    # 测试连接
    response = r.ping()
    print(f"redis ping response: {response}") # 应该打印 true
    print("成功连接到 redis!")
except redis.exceptions.connectionerror as e:
    print(f"无法连接到 redis: {e}")
# 注意:使用完后,理论上应该关闭连接 r.close(),但在简单脚本中不总是必需。
# 对于长时间运行的应用,强烈推荐使用连接池。

方法二:使用连接池 (connection pool) - 推荐

对于需要频繁与 redis 交互的应用程序(如 web 服务),每次操作都建立和断开连接会非常低效。连接池可以管理一组预先建立的连接,供应用程序复用。

import redis
try:
    # 创建连接池
    pool = redis.connectionpool(host='localhost', port=6379, db=0, decode_responses=true)
    # 如果有密码:
    # pool = redis.connectionpool(host='your_redis_host', port=6379, db=0, password='your_password', decode_responses=true)
    # 从连接池获取一个连接
    r = redis.redis(connection_pool=pool)
    # 测试连接
    response = r.ping()
    print(f"redis ping response (from pool): {response}")
    print("成功通过连接池连接到 redis!")
    # 使用连接池时,不需要手动关闭单个连接 r.close()
    # 连接会在使用完毕后自动返回池中
except redis.exceptions.connectionerror as e:
    print(f"无法创建 redis 连接池或连接失败: {e}")
# 当你的应用程序退出时,可以考虑关闭整个连接池(虽然不总是必需)
# pool.disconnect()

decode_responses=true 的重要性:

  • 如果不设置 decode_responses=true (默认为 false),get()lrange()smembers() 等方法返回的是字节串 (bytes),例如 b'value'
  • 设置 decode_responses=true 后,这些方法会尝试使用指定的编码(默认为 utf-8)将字节串解码为字符串 (str),例如 'value',这通常更方便处理。

5. 操作 redis 数据类型

假设我们已经通过连接池获取了 r 对象 (r = redis.redis(connection_pool=pool))。

5.1 string (字符串)

# --- string 操作 ---
print("\n--- string 操作 ---")
# 设置值 (set key value)
# 如果键已存在,会覆盖旧值
r.set('user:name', 'alice')
print(f"设置 user:name: {r.get('user:name')}")
# 获取值 (get key)
name = r.get('user:name')
print(f"获取 user:name: {name}")
# 设置带过期时间的值 (set key value ex seconds)
# 'counter' 将在 60 秒后自动删除
r.set('counter', 100, ex=60)
print(f"设置带过期时间的 counter: {r.get('counter')}")
# 设置值,仅当键不存在时 (set key value nx)
# 如果 'user:name' 已存在,setnx 返回 false,不设置
result_nx = r.setnx('user:name', 'bob')
print(f"尝试设置已存在的 user:name (setnx): {result_nx} (当前值: {r.get('user:name')})")
result_nx_new = r.setnx('user:city', 'new york')
print(f"尝试设置不存在的 user:city (setnx): {result_nx_new} (当前值: {r.get('user:city')})")
# 递增 (incr key) - 对字符串表示的数字进行加 1
r.set('visit_count', '0')
r.incr('visit_count')
r.incr('visit_count')
print(f"递增 visit_count: {r.get('visit_count')}") # 输出 '2'
# 按指定步长递增 (incrby key increment)
r.incrby('visit_count', 10)
print(f"递增 visit_count by 10: {r.get('visit_count')}") # 输出 '12'
# 递减 (decr key / decrby key decrement)
r.decr('visit_count')
print(f"递减 visit_count: {r.get('visit_count')}") # 输出 '11'
# 检查键是否存在 (exists key)
exists = r.exists('user:name')
print(f"user:name 是否存在: {exists}") # 输出 1 (表示存在)
exists_non = r.exists('nonexistent_key')
print(f"nonexistent_key 是否存在: {exists_non}") # 输出 0 (表示不存在)
# 设置键的过期时间 (expire key seconds)
r.set('temp_data', 'will disappear')
r.expire('temp_data', 5) # 5 秒后过期
print(f"设置 temp_data 过期时间为 5 秒")
# time.sleep(6) # 可以取消注释这行来验证过期
# print(f"5 秒后 temp_data 的值: {r.get('temp_data')}") # 如果 sleep 了,这里会输出 none
# 删除键 (del key [key ...])
deleted_count = r.delete('user:city', 'temp_data') # 可以同时删除多个
print(f"删除了 {deleted_count} 个键 ('user:city', 'temp_data')")
print(f"删除后 user:city 的值: {r.get('user:city')}") # 输出 none

5.2 list (列表)

redis 列表是有序的字符串序列,类似 python 列表,但操作主要在两端进行。

# --- list 操作 ---
print("\n--- list 操作 ---")
list_key = 'tasks'
# 清理旧数据(如果存在)
r.delete(list_key)
# 从左侧推入元素 (lpush key element [element ...])
r.lpush(list_key, 'task3', 'task2', 'task1') # 插入顺序: task1, task2, task3
print(f"从左侧推入 'task1', 'task2', 'task3'")
# 从右侧推入元素 (rpush key element [element ...])
r.rpush(list_key, 'task4', 'task5') # 插入顺序: task4, task5
print(f"从右侧推入 'task4', 'task5'")
# 获取列表范围 (lrange key start stop) - 获取所有元素
# 0 是第一个元素,-1 是最后一个元素
all_tasks = r.lrange(list_key, 0, -1)
print(f"当前列表 ({list_key}): {all_tasks}") # 输出: ['task1', 'task2', 'task3', 'task4', 'task5'] (注意 lpush/rpush 的顺序)
# 获取列表长度 (llen key)
length = r.llen(list_key)
print(f"列表长度: {length}") # 输出: 5
# 从左侧弹出元素 (lpop key) - 获取并移除第一个元素
first_task = r.lpop(list_key)
print(f"从左侧弹出: {first_task}") # 输出: 'task1'
print(f"弹出后列表: {r.lrange(list_key, 0, -1)}") # 输出: ['task2', 'task3', 'task4', 'task5']
# 从右侧弹出元素 (rpop key) - 获取并移除最后一个元素
last_task = r.rpop(list_key)
print(f"从右侧弹出: {last_task}") # 输出: 'task5'
print(f"弹出后列表: {r.lrange(list_key, 0, -1)}") # 输出: ['task2', 'task3', 'task4']
# 获取指定索引的元素 (lindex key index)
task_at_index_1 = r.lindex(list_key, 1)
print(f"索引 1 处的元素: {task_at_index_1}") # 输出: 'task3'

5.3 set (集合)

无序、唯一的字符串集合。

# --- set 操作 ---
print("\n--- set 操作 ---")
set_key = 'unique_users'
# 清理旧数据
r.delete(set_key)
# 添加成员 (sadd key member [member ...]) - 返回成功添加的新成员数量
added_count = r.sadd(set_key, 'user1', 'user2', 'user3')
print(f"向集合添加了 {added_count} 个成员")
added_again = r.sadd(set_key, 'user2', 'user4') # 'user2' 已存在,不会重复添加
print(f"再次尝试添加 'user2', 'user4',新增 {added_again} 个") # 输出 1
# 获取所有成员 (smembers key)
members = r.smembers(set_key)
print(f"集合所有成员: {members}") # 输出: {'user1', 'user2', 'user3', 'user4'} (注意是无序的)
# 检查成员是否存在 (sismember key member)
is_member = r.sismember(set_key, 'user3')
print(f"'user3' 是否是集合成员: {is_member}") # 输出: true
is_member_no = r.sismember(set_key, 'user5')
print(f"'user5' 是否是集合成员: {is_member_no}") # 输出: false
# 获取集合成员数量 (scard key)
count = r.scard(set_key)
print(f"集合成员数量: {count}") # 输出: 4
# 移除成员 (srem key member [member ...]) - 返回成功移除的数量
removed_count = r.srem(set_key, 'user1', 'user5') # 'user5' 不存在,只移除 'user1'
print(f"尝试移除 'user1', 'user5',成功移除 {removed_count} 个") # 输出: 1
print(f"移除后集合成员: {r.smembers(set_key)}") # 输出: {'user2', 'user3', 'user4'}

5.4 hash (哈希/散列)

存储键值对的集合,适合表示对象。

# --- hash 操作 ---
print("\n--- hash 操作 ---")
hash_key = 'user:1000' # 通常用 : 分隔表示层级
# 清理旧数据
r.delete(hash_key)
# 设置单个字段值 (hset key field value)
r.hset(hash_key, 'name', 'bob')
print(f"设置 hash 字段 'name'")
# 同时设置多个字段值 (hset key mapping)
user_data = {'email': 'bob@example.com', 'city': 'london'}
r.hset(hash_key, mapping=user_data)
print(f"同时设置 hash 字段 'email', 'city'")
# 获取单个字段值 (hget key field)
name = r.hget(hash_key, 'name')
email = r.hget(hash_key, 'email')
print(f"获取 hash 字段 'name': {name}") # 输出: 'bob'
print(f"获取 hash 字段 'email': {email}") # 输出: 'bob@example.com'
# 获取所有字段和值 (hgetall key) - 返回字典
all_fields = r.hgetall(hash_key)
print(f"获取 hash 所有字段和值: {all_fields}") # 输出: {'name': 'bob', 'email': 'bob@example.com', 'city': 'london'}
# 获取所有字段名 (hkeys key)
keys = r.hkeys(hash_key)
print(f"获取 hash 所有字段名: {keys}") # 输出: ['name', 'email', 'city']
# 获取所有值 (hvals key)
values = r.hvals(hash_key)
print(f"获取 hash 所有值: {values}") # 输出: ['bob', 'bob@example.com', 'london']
# 检查字段是否存在 (hexists key field)
exists = r.hexists(hash_key, 'city')
print(f"hash 字段 'city' 是否存在: {exists}") # 输出: true
# 删除字段 (hdel key field [field ...]) - 返回成功删除的字段数量
deleted_fields = r.hdel(hash_key, 'city', 'nonexistent_field')
print(f"尝试删除 hash 字段 'city', 'nonexistent_field',成功删除 {deleted_fields} 个") # 输出: 1
print(f"删除字段后 hash: {r.hgetall(hash_key)}") # 输出: {'name': 'bob', 'email': 'bob@example.com'}

5.5 sorted set (有序集合)

与 set 类似,但每个成员都有一个分数(score),redis 会根据分数排序。

# --- sorted set 操作 ---
print("\n--- sorted set 操作 ---")
zset_key = 'leaderboard'
# 清理旧数据
r.delete(zset_key)
# 添加成员和分数 (zadd key {member1: score1, member2: score2 ...})
# 如果成员已存在,会更新其分数
r.zadd(zset_key, {'player1': 1500, 'player2': 2100, 'player3': 1800})
print(f"添加成员到有序集合")
r.zadd(zset_key, {'player1': 1650}) # 更新 player1 的分数
print(f"更新 player1 的分数")
# 按分数范围获取成员 (zrangebyscore key min max [withscores=true])
# (1000, 2000] 表示分数 > 1000 且 <= 2000
players_in_range = r.zrangebyscore(zset_key, '(1000', 2000) # 默认不包含分数
print(f"分数在 (1000, 2000] 之间的玩家: {players_in_range}") # 输出: ['player1', 'player3']
players_with_scores = r.zrangebyscore(zset_key, 1000, 2000, withscores=true)
print(f"分数在 [1000, 2000] 之间的玩家 (带分数): {players_with_scores}") # 输出: [('player1', 1650.0), ('player3', 1800.0)]
# 按排名范围获取成员 (zrange key start stop [withscores=true])
# 0 是排名第一(分数最低),-1 是排名最后(分数最高)
top_players = r.zrange(zset_key, 0, -1, desc=true) # desc=true 按分数从高到低排
print(f"所有玩家按分数降序排列: {top_players}") # 输出: ['player2', 'player3', 'player1']
top_2_with_scores = r.zrange(zset_key, 0, 1, desc=true, withscores=true)
print(f"排名前 2 的玩家 (带分数): {top_2_with_scores}") # 输出: [('player2', 2100.0), ('player3', 1800.0)]
# 获取成员的分数 (zscore key member)
score_player2 = r.zscore(zset_key, 'player2')
print(f"'player2' 的分数: {score_player2}") # 输出: 2100.0
# 获取成员数量 (zcard key)
num_players = r.zcard(zset_key)
print(f"有序集合成员数量: {num_players}") # 输出: 3
# 移除成员 (zrem key member [member ...]) - 返回成功移除的数量
removed_count = r.zrem(zset_key, 'player1', 'nonexistent')
print(f"尝试移除 'player1', 'nonexistent',成功移除 {removed_count} 个") # 输出: 1
print(f"移除后有序集合 (降序): {r.zrange(zset_key, 0, -1, desc=true, withscores=true)}")

6. 进阶用法

6.1 pipeline (管道)

当你需要连续执行多个 redis 命令时,每次命令都需要一次网络往返 (client <-> server)。使用 pipeline 可以将多个命令打包一次性发送给服务器,服务器执行完所有命令后再将结果一次性返回,从而显著减少网络延迟,提高性能。

# --- pipeline 操作 ---
print("\n--- pipeline 操作 ---")
# 使用连接池获取连接
r_pipe = redis.redis(connection_pool=pool)
# 创建 pipeline 对象
pipe = r_pipe.pipeline()
# 在 pipeline 中缓存命令 (不会立即执行)
pipe.set('pipe_test:name', 'pipeline user')
pipe.incr('pipe_test:counter', 1)
pipe.expire('pipe_test:name', 30)
pipe.get('pipe_test:name')
pipe.get('pipe_test:counter')
# 一次性执行所有缓存的命令
# results 是一个列表,包含每个命令的执行结果,顺序与添加顺序一致
results = pipe.execute()
print(f"pipeline 执行结果: {results}")
# 可能输出: [true, 1, true, 'pipeline user', '1'] (具体值取决于 counter 之前的值)
# 清理
r_pipe.delete('pipe_test:name', 'pipe_test:counter')

6.2 发布/订阅 (pub/sub)

redis 提供简单的发布/订阅消息模式。

  • 发布者 (publisher): 向指定频道 (channel) 发送消息。
  • 订阅者 (subscriber): 监听一个或多个频道,接收发送到这些频道的消息。

订阅者代码 (subscriber.py):

import redis
import time
pool = redis.connectionpool(host='localhost', port=6379, db=0, decode_responses=true)
r = redis.redis(connection_pool=pool)
# 创建 pubsub 对象
p = r.pubsub()
# 订阅频道 'news_channel' 和 'alerts_channel'
p.subscribe('news_channel', 'alerts_channel')
print("开始监听频道 'news_channel' 和 'alerts_channel'...")
# 持续监听消息 (这是一个阻塞操作)
# p.listen() 返回一个生成器
try:
    for message in p.listen():
        print(f"收到消息: {message}")
        # 消息格式通常是:
        # {'type': 'subscribe', 'pattern': none, 'channel': 'news_channel', 'data': 1}
        # {'type': 'message', 'pattern': none, 'channel': 'news_channel', 'data': 'hello world!'}
        # 这里可以根据 message['type'] 和 message['channel'] 处理不同消息
        if message['type'] == 'message':
            print(f"  频道 [{message['channel']}] 收到数据: {message['data']}")
            if message['data'] == 'stop':
                print("收到停止信号,退出监听。")
                break # 可以根据特定消息退出循环
except keyboardinterrupt:
    print("手动中断监听。")
finally:
    # 取消订阅并关闭连接
    p.unsubscribe()
    p.close()
    pool.disconnect() # 关闭整个池
    print("监听已停止,连接已关闭。")

发布者代码 (publisher.py):

import redis
import time
pool = redis.connectionpool(host='localhost', port=6379, db=0, decode_responses=true)
r = redis.redis(connection_pool=pool)
# 向 'news_channel' 发布消息
count1 = r.publish('news_channel', 'breaking news: redis is awesome!')
print(f"向 'news_channel' 发布消息,接收者数量: {count1}")
time.sleep(1)
# 向 'alerts_channel' 发布消息
count2 = r.publish('alerts_channel', 'system alert: high cpu usage detected!')
print(f"向 'alerts_channel' 发布消息,接收者数量: {count2}")
time.sleep(1)
# 发送停止信号
count3 = r.publish('news_channel', 'stop')
print(f"向 'news_channel' 发布停止信号,接收者数量: {count3}")
pool.disconnect()

运行 pub/sub: 先运行 subscriber.py,它会阻塞并等待消息。然后运行 publisher.py,你将在 subscriber.py 的控制台中看到接收到的消息。

7. 最佳实践与提示

  • 使用连接池: 对于生产环境或需要频繁交互的应用,务必使用连接池。decode_responses=true: 大多数情况下设置此选项能简化代码,避免手动解码 bytes
  • 键名设计 (key naming): 使用有意义且结构化的键名,常用 object-type:id:field 格式(如 user:1000:profile)。保持一致性。
  • 错误处理: 使用 try...except 捕获可能的 redis.exceptions.connectionerrorredis.exceptions.timeouterror 等异常。
  • 大 value 处理: 避免在 redis 中存储过大的单个 value(几 mb 或更大),这可能影响性能和内存使用。考虑拆分或使用其他存储。
  • 原子性: redis 的单个命令是原子的。对于需要多个命令组合的原子操作,考虑使用 lua 脚本 (通过 r.eval()r.register_script()) 或 redis transactions (通过 pipeline 的 multi()exec())。
  • 资源管理: 确保在适当的时候(如应用退出时)断开连接或关闭连接池 (pool.disconnect()),尤其是在非长时间运行的脚本中。

这个教程涵盖了 python 操作 redis 的大部分常用功能。根据你的具体需求,可以进一步探索 redis 的事务、lua 脚本、streams 等更高级的特性。记得查阅 redis-py 的官方文档和 redis 官方文档以获取最全面和最新的信息。

到此这篇关于python操作redis基础的文章就介绍到这了,更多相关python操作redis内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com