之前写了 cachetools 的缓存工具,那个是纯内存的,性能上确实有优势,但重启后缓存数据会丢失。diskcache 则利用轻量级的 sqlite 数据库,该数据库不需要单独的服务器进程,并可以持久化数据结构,且可以突破内存的限制,针对大量数据的缓存时,不会因为内存溢出而丢失数据。
| 特性 | diskcache | cachetools |
|---|---|---|
| 存储位置 | 磁盘为主(内存为辅) | 纯内存 |
| 持久化 | ✅ 支持(重启后数据还在) | ❌ 不支持 |
| 数据大小 | 适合大数据(受磁盘限制) | 适合小数据(受内存限制) |
| 速度 | 磁盘i/o较慢 | 纯内存很快 |
| 使用场景 | 长期缓存、大数据 | 短期缓存、小数据 |
安装
pip install diskcache
淘汰策略
从源码的 eviction_policy 值可以看出,淘汰策略主要有以下几种。
'least-recently-stored'- 默认,按存储时间淘汰'least-recently-used'- 按访问时间淘汰(每次访问都写数据库)'least-frequently-used'- 按访问频率淘汰(每次访问都写数据库)'none'- 禁用自动淘汰
默认则是按照 lrs 按缓存存储的先后时间进行淘汰的淘汰策略。
简单存取
from diskcache import cache
# 1. 实例一个缓存对象
# 需要传入目录路径。如果目录路径不存在,将创建该路径,并且会在指定位置创建一个cache.db的文件。
# 如果未指定,则会自动创建一个临时目录,并且会在指定位置创建一个cache.db的文件。
cache = cache("cache")
# 2. 保存缓存
cache.set('name', '张三', expire=60, read=true, tag='姓名', retry=true)
# 3. 获取缓存 expire_time为真,返回有效期时间;tag为真,返回缓存设置的tag值;
name = cache.get('name', default=false, expire_time=true, tag=true)
print(name)
# ('张三', 1770617370.6258903, '姓名')
上面代码执行之后,我们可以在当前位置的下发现有个 cache 目录,其中有个 cache.db 文件。因为这个 cache.db 是个 sqlite 数据库文件,我们可以尝试使用 pandas 读取一下。
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('sqlite:///cache/cache.db')
pd.set_option('display.max_columns', none) # 不限制列数
pd.set_option('display.width', none) # 不限制列宽
if __name__ == '__main__':
res = pd.read_sql('select * from cache;', con=engine)
print(res)
rowid key raw store_time expire_time access_time access_count tag size mode filename value
0 1 name 1 1.770605e+09 1.770605e+09 1.770605e+09 0 姓名 0 1 none 张三
从查询结果中可以看出,数据库除了常规的 key 和 value,还有过期时间戳 expire_time,tag, access_count 等参数。
缓存获取
获取指定
正常使用 get() 方法获取缓存,但这种方法如果缓存不存或者缓存已过期的话,就只是返回 none 而不会报错。
from diskcache import cache
cache = cache("cache")
print(cache.get('key_not_exist'))
# none
还有就是使用 read() 方法获取缓存,key 不存在或已过期则直接报 keyerror 的错。具体使用哪种看自己的使用场景。
from diskcache import cache
cache = cache("cache")
print(cache.read('key_not_exist'))
# traceback (most recent call last):
# file "e:\lky_project\test_project\test4.py", line 4, in <module>
# print(cache.read('key_not_exist'))
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^
# file "e:\myenv\py11_base_env\lib\site-packages\diskcache\core.py", line 1252, in read
# raise keyerror(key)
# keyerror: 'key_not_exist'
获取最近
有时候,我们想知道最近(最远)存储的缓存是哪个,则可以使用 peekitem() 方法获取最近(最远)存储的缓存数据。
注意:该操作会自动清空数据库中已过期的缓存。
from diskcache import cache
cache = cache("cache")
print(cache.peekitem())
# ('key5', 'value5')
print(cache.peekitem(last=false))
# ('key1', 'value1')
缓存过期
设置缓存的时候(无论是使用 set() 还是 add() 方法),都有一个过期时长参数 expire 指定该缓存多久后过期。
from diskcache import cache
cache = cache("cache")
result = cache.add("key1", "value1", expire=60)
print(result)
# true
result = cache.add("key2", "value2", expire=30)
print(result)
# true
如果缓存已过期,则返回的对应的 value 为 none(当然,如果指定了默认值 default,则返回默认值,比如下面的 false)。
from diskcache import cache
cache = cache("cache")
name = cache.get('name', default=false, expire_time=true, tag=true)
print(name)
# (false, none, none)但其实这个时候,数据仍然在数据库里的,只不过因为当前时间超过了过期时间 expire_time,所以不会将这个数据取回来。
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('sqlite:///cache/cache.db')
pd.set_option('display.max_columns', none) # 不限制列数
pd.set_option('display.width', none) # 不限制列宽
if __name__ == '__main__':
res = pd.read_sql('select * from cache;', con=engine)
print(res)
# rowid key raw store_time expire_time access_time access_count tag size mode filename value
# 0 1 name 1 1.770617e+09 1.770617e+09 1.770617e+09 0 姓名 0 1 none 张三
缓存清理
清空过期
使用 expire() 方法可以清空过期缓存,并返回清空的缓存数。
import time
from diskcache import cache
cache = cache("cache")
cache.set('name', '张三', expire=60, read=true, tag='姓名', retry=true)
cache.set('age', 30, expire=30, read=true, tag='年龄', retry=true)
time.sleep(35)
count = cache.expire()
print(count)
# 1
清空所有
使用 clear() 方法可以清空所有缓存,并返回清空的缓存数。
from diskcache import cache
cache = cache("cache")
cache.set('name', '张三', expire=60, read=true, tag='姓名', retry=true)
cache.set('age', 30, expire=60, read=true, tag='年龄', retry=true)
count = cache.clear()
print(count)
# 2强制清理
使用 cull() 方法会先清理过期缓存,再按照给定的缓存淘汰策略,清理缓存直到磁盘缓存容量小于size_limit 的大小。
from diskcache import cache
cache = cache("cache")
cache.set('name', '张三', expire=60, read=true, tag='姓名', retry=true)
cache.set('age', 30, expire=60, read=true, tag='年龄', retry=true)
count = cache.cull()
print(count)
按 tag 清理
使用 evict() 方法可以手动按照 tag 的名称清理缓存,并返回清空的缓存数。
from diskcache import cache
cache = cache("cache")
cache.set('name', '张三', expire=60, read=true, tag='姓名', retry=true)
cache.set('age', 30, expire=60, read=true, tag='年龄', retry=true)
count = cache.evict("年龄")
print(count)
# 1
按 key 清理
delete() 删除并返回是否删除成功。
from diskcache import cache
cache = cache("cache")
cache.set('name', '张三', expire=60, read=true, tag='姓名', retry=true)
cache.set('age', 30, expire=60, read=true, tag='年龄', retry=true)
result = cache.delete("name")
print(result)
# true
pop() 删除并返回缓存的 value。
from diskcache import cache
cache = cache("cache")
cache.set('name', '张三', expire=60, read=true, tag='姓名', retry=true)
cache.set('age', 30, expire=60, read=true, tag='年龄', retry=true)
result = cache.pop("name")
print(result)
# 张三
缓存添加
可以使用 add() 方法仅添加缓存。
- 只有键不存在时才会存储
- 如果键已存在,不会覆盖,返回
false - 相当于
setdefault()的行为
from diskcache import cache
cache = cache("cache")
result = cache.add("key1", "value1")
print(result)
# true
当然,set() 方法也可以设置缓存。与 add() 的区别如下。
- 总是会存储值,无论键是否已存在
- 如果键已存在,会覆盖旧值
- 相当于字典的
dict[key] = value
缓存刷新
针对未过期的 key,重新刷新 key 的过期时间。如果 key 已过期,则会刷新失败。
from diskcache import cache
cache = cache("cache")
result = cache.touch("key1", 60)
print(result)
# true
缓存判断
cache 中定义了 __contains__ 魔法方法,可以用 in 的方式判断对应的 key 是否在缓存中。
from diskcache import cache
cache = cache("cache")
cache.set("key1", "value1")
print("key1" in cache)
# true
也可以使用 get() 方法,看返回的结果是不是 none 来进行判断(但这种判断并不精确,如果 value 值本身是 none 的话,则会造成误判)。
当然,还可以使用 read() 方法,key 不存在的话直接报 keyerror,这种需要自己处理这种异常。
缓存修改
针对整型的缓存数据类型,可以使用 incr() 和 decr() 对 value 进行加减指定 delta 的数值。
from diskcache import cache
cache = cache("cache")
cache.set('age', 30, expire=60, read=true, tag='年龄', retry=true)
cache.incr("age", delta=5)
print(cache.get("age")) # 35
cache.decr("age", delta=3)
print(cache.get("age")) # 32缓存检查
check() 方法会对数据库和文件系统的一致性进行检查,如果有告警,则返回告警信息。
from diskcache import cache
cache = cache("cache")
warnings = cache.check()
print(warnings)
# []
自动缓存
像之前的 cachetools 或标准库的 lru_cache,都是可以直接装饰在函数上,实现对函数调用自动缓存的,diskcache 也可以使用缓存实例的 memoize() 方法来装饰函数,实现调用结果自动缓存。
from diskcache import cache
cache = cache('cache')
# 使用cache.memoize()装饰器自动缓存函数结果
@cache.memoize(expire=60)
def compute_expensive_operation(x):
# 模拟耗时计算
print("call function")
return x * x * x
# 第一次调用,计算结果并缓存
result = compute_expensive_operation(3)
print(result) # 输出: call function 和 27
# 第二次调用,直接从缓存获取结果,不会再真正执行函数
result = compute_expensive_operation(3)
print(result) # 输出: 27
看源码可知,它会自动将输入参数等按照指定流程转换成元组(这样能确保相同输入转换后 的 key 始终一致)作为缓存的 key,每次函数调用时,如果已经有获取到缓存结果,则直接返回;如果还没有缓存结果,则调用函数,并将结果缓存并返回。
def wrapper(*args, **kwargs):
"""wrapper for callable to cache arguments and return values."""
key = wrapper.__cache_key__(*args, **kwargs)
result = self.get(key, default=enoval, retry=true)
if result is enoval:
result = func(*args, **kwargs)
if expire is none or expire > 0:
self.set(key, result, expire, tag=tag, retry=true)
return result队列操作
我们可以使用相同前缀的 key 来约定一个队列(简单理解就是相同前缀的 key 同属于一个队列),然后通过 push() 和 pull() 方法来实现入列和出列。
队列推送
可以使用 push() 方法往指定前缀的队列推送 value(key 则由方法根据前缀及缓存中已有的相同前缀的 key 来自动生成)。
from diskcache import cache
cache = cache("cache")
result = cache.push("first", prefix="test")
print(result) # test-500000000000000
result = cache.push("second", prefix="test")
print(result) # test-500000000000001
result = cache.push("third", prefix="test", side="front") # 从队列前面插入
print(result) # test-499999999999999
队列拉取
出队的顺序与入队顺序相同(遵从 fifo 先进先出的规则)。
from diskcache import cache
cache = cache("cache")
result = cache.pull("test")
print(result) # ('test-499999999999999', 'third')
result = cache.pull("test")
print(result) # ('test-500000000000000', 'first')
result = cache.pull("test")
print(result) # ('test-500000000000001', 'second')
当然,我们也可以通过参数 side 指定从头部出列还是尾部出列。
from diskcache import cache
cache = cache("cache")
result = cache.pull("test")
print(result) # ('test-499999999999999', 'third')
result = cache.pull("test", side="back")
print(result) # ('test-500000000000001', 'second')
result = cache.pull("test")
print(result) # ('test-500000000000000', 'first')
队列查看
我们可以使用 peek() 方法,查看指定前缀的队列的队首和队尾的数据。
注意:不同于 pull() 方法,peek() 只是查看而不会删除未过期的缓存(过期缓存仍然会自动清理),毕竟从 peek 这个单词的意思本身就是 窥视,偷看的意思。
from diskcache import cache
cache = cache("cache")
print(cache.peek(prefix="test"))
# ('test-499999999999999', 'third')
print(cache.peek(prefix="test", side="back"))
# ('test-500000000000001', 'second')
连接关闭
使用 close() 方法可以关闭与底层 sqlite 数据库的连接。
from diskcache import cache
cache = cache("cache")
cache.close()
配置重置
缓存的配置存储在 settings 表中。
select key, value from settings;
key value
0 count 0
1 size 0
2 hits 0
3 misses 0
4 statistics 0
5 tag_index 0
6 eviction_policy least-recently-stored
7 size_limit 1073741824
8 cull_limit 10
9 sqlite_auto_vacuum 1
10 sqlite_cache_size 8192
11 sqlite_journal_mode wal
12 sqlite_mmap_size 67108864
13 sqlite_synchronous 1
14 disk_min_file_size 32768
15 disk_pickle_protocol 5
我们可以使用 reset() 方法重置 settings 中对应 key 的 value。比如我们设置 cull_limit 值为 30。
from diskcache import cache
cache = cache("cache")
value = cache.reset("cull_limit", 30)
print(value)
# 30
再次查看 settings 表时,可以看到 cull_limit 的值已被修改。
key value
0 count 2
1 size 0
2 hits 0
3 misses 0
4 statistics 0
5 tag_index 0
6 eviction_policy least-recently-stored
7 size_limit 1073741824
8 cull_limit 30
...
配置查看
如果我么仅仅是想查看配置的值,而不需要修改的话,则在使用 reset() 方法时,不需要传入对应的值,且 update 置为 false 即可。
from diskcache import cache
cache = cache("cache")
value = cache.reset("cull_limit", update=false)
print(value)
# 10
事务操作
可以使用事务的方式批量进行操作。
from diskcache import cache
cache = cache("cache")
items = {"key1": "value1", "key2": "value2", "key3": "value3"}
with cache.transact(): # 性能提升2-5倍
for key, value in items.items():
cache[key] = value
命中统计
from diskcache import cache
cache = cache("cache")
# 启用统计
cache.stats(enable=true)
# 获取命中率
hits, misses = cache.stats()
print(f"hits: {hits}; misses: {misses}") # hits: 5; misses: 2
# 检查缓存一致性
warnings = cache.check()
print(warnings) # []
# 获取缓存体积(字节)
size = cache.volume()
print(size) # 32768
deque 队列缓存
估计是怕我们直接用 cache 原生的 push() 和 pull() 方法来操作队列太不方便,diskcache 还帮我们封装实现了一个 deque 类(其中大部分操作也是基于 cache 的 push() 和 pull() 方法来实现)。
from diskcache import deque cache = deque([0, 1, 2, 3, 4], "cache") cache.append(5) print(list(cache)) # [0, 1, 2, 3, 4, 5]
其 key 值则是无前缀的 key。其他操作则与标准库 collections 中的 deque 基本类似,就不赘述了。不同之处在于每个修改操作都会持久化到数据库,感兴趣的可以看 deque 中方法的源码实现。
rowid key raw store_time expire_time access_time access_count tag size mode filename value
0 1 500000000000000 1 1.770714e+09 none 1.770714e+09 0 none 0 1 none 0
1 2 500000000000001 1 1.770714e+09 none 1.770714e+09 0 none 0 1 none 1
2 3 500000000000002 1 1.770714e+09 none 1.770714e+09 0 none 0 1 none 2
3 4 500000000000003 1 1.770714e+09 none 1.770714e+09 0 none 0 1 none 3
4 5 500000000000004 1 1.770714e+09 none 1.770714e+09 0 none 0 1 none 4
5 6 500000000000005 1 1.770714e+09 none 1.770714e+09 0 none 0 1 none 5
index 索引缓存
index 组件提供了类似字典的持久化存储方案(从源码中也可以看出,它默认没有设置缓存淘汰策略,也即永不过期)。它支持事务操作,确保数据一致性,同时保持高效的读写性能。
self._cache = cache(directory, eviction_policy='none')
from diskcache import index
# 持久化字典,永不淘汰
index = index("cache", {"a": 1, "b": 2}, c=3)
index.update([('d', 4), ('e', 5)])
print(list(index))
# ['a', 'b', 'c', 'd', 'e']
我们也可以从通用的 cache() 实例来创建 index。
from diskcache import cache, index
cache = cache("cache")
index = index.fromcache(cache)
index 缓存的设置和获取和字典一样(index 中定义了 __getitem__,__setitem__,__delitem__ 等字典类型的魔法方法)。封装的其他方法也基本上字典一致。
from diskcache import index
index = index("cache")
index['f'] = 6
print(index.get('c')) # 3还可以像字典一样 setdefault,对应的 key 有值时,返回缓存的值;没有值时,使用给定的值设置缓存并返回。
from diskcache import index
index = index("cache")
result = index.setdefault('h', 9)
print(result) # 8
fanout 分片缓存
fanoutcache提供了横向扩展能力,它的一致性哈希算法确保了数据均匀分布,将数据自动分片到多个子缓存,同时支持动态扩容和多进程操作。
from diskcache import fanoutcache
# 初始化分布式缓存,8个分片,每个分片1gb大小限制
distributed_cache = fanoutcache(
directory='./cache',
shards=8,
size_limit=1024 ** 3 # 1gb per shard
)
# 在多进程环境中共享使用
from multiprocessing import pool
def fetch_page(url):
return f"content of {url}"
def spider_worker(url):
# 自动路由到对应分片
if distributed_cache.get(url, default=none) is none:
data = fetch_page(url)
distributed_cache.set(url, data, expire=86400) # 缓存24小时
return url
url_list = [
"https://example1.com",
"https://example2.com",
"https://example3.com",
"https://example4.com",
]
if __name__ == '__main__':
with pool(processes=8) as pool:
# 并行处理url列表
results = pool.map(spider_worker, url_list)
此时,diskcache 会按照分片数目,在指定目录下建立分片个数目的目录,并将缓存均匀分散存储在这些目录中的 cache.db 数据库中。源码中的 self._shards 也可以看出他新建分片缓存目录的逻辑。
self._shards = tuple(
cache(
directory=op.join(directory, '%03d' % num),
timeout=timeout,
disk=disk,
size_limit=size_limit,
**settings,
)
for num in range(shards)
)注意:上面多进程示例,进程池的 with 块需要放到 __main__ 块中执行,不然会因为安全因素导致执行失败。

下面是其中一个 001 分片的 cache.db 中存储的缓存数据。
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('sqlite:///cache/001/cache.db')
pd.set_option('display.max_columns', none) # 不限制列数
pd.set_option('display.width', none) # 不限制列宽
if __name__ == '__main__':
res = pd.read_sql('select * from cache;', con=engine)
# res = pd.read_sql('select key, value from settings;', con=engine)
print(res)# rowid key raw store_time expire_time access_time access_count tag size mode filename value
# 0 1 https://example4.com 1 1.770632e+09 1.770718e+09 1.770632e+09 0 none 0 1 none content of https://example4.com
其他的缓存操作方法与 cache 类似,只是针对每个 key,会通过相同的 hash 算法先找到其所属的正确的分片,然后再进行操作。
以上就是python结合diskcache实现磁盘缓存工具的详细内容,更多关于python磁盘缓存的资料请关注代码网其它相关文章!
发表评论