引言
在实时数据处理系统中,我们经常需要统计某个事件在特定时间窗口内的发生次数,例如:
- 统计用户每小时访问次数
- 限制设备每分钟请求频率
- 广告曝光按小时去重计数
这类需求通常面临两个核心挑战:
- 高并发计数:多台服务器同时读写同一个计数器
- 精确时间窗口:数据到点自动过期,避免累积
本文将详细介绍如何基于 redis 实现高性能、高可用的计数方案,并提供完整的java代码实现。
一、redis计数方案选型
1.1 为什么选择redis
| 方案 | qps | 数据一致性 | 实现复杂度 |
|---|---|---|---|
| 数据库+事务 | ~1k | 强一致 | 高 |
| 本地缓存 | ~100k | 最终一致 | 中 |
| redis原子操作 | 50k+ | 强一致 | 低 |
redis的单线程模型天然适合计数场景,提供incr/incrby等原子命令。
1.2 key设计原则
// 格式:业务前缀:appid:deviceid:ip:时间窗口 string key = "flow:count:app123:device456:127.0.0.1:2023080117";
- 包含所有维度信息
- 时间窗口按小时切分(可调整)
- 添加业务前缀避免冲突
二、基础实现方案
2.1 简单incrby实现
public void incrementcount(string key, int delta) {
redistemplate.opsforvalue().increment(key, delta);
}
问题:没有过期时间,会导致数据无限堆积
2.2 增加过期时间
public void incrementwithexpire(string key, int delta, long ttlseconds) {
redistemplate.opsforvalue().increment(key, delta);
redistemplate.expire(key, ttlseconds, timeunit.seconds);
}
新问题:每次操作都设置ttl,造成冗余redis调用
三、优化方案:精准ttl控制
3.1 判断key是否首次写入
我们需要确保ttl只在key创建时设置一次,两种实现方式:
方案a:lua脚本(推荐)
private static final string lua_script =
"local current = redis.call('incrby', keys[1], argv[1])\n" +
"if current == tonumber(argv[1]) then\n" +
" redis.call('expire', keys[1], argv[2])\n" +
"end\n" +
"return current";
public long incrementatomically(string key, int delta, long ttl) {
return redistemplate.execute(
new defaultredisscript<>(lua_script, long.class),
collections.singletonlist(key),
string.valueof(delta), string.valueof(ttl)
);
}
优势:
- 完全原子性执行
- 单次网络往返
- 精准判断首次写入
方案b:setnx+incrby
public void incrementwithnx(string key, int delta, long ttl) {
redistemplate.executepipelined((rediscallback<object>) connection -> {
stringredisconnection conn = (stringredisconnection) connection;
conn.setnx(key, "0"); // 尝试初始化
conn.incrby(key, delta);
if (conn.setnx(key + ":lock", "1")) { // 简易锁判断首次
conn.expire(key, ttl);
conn.expire(key + ":lock", 10);
}
return null;
});
}
适用场景:redis版本<2.6(不支持lua)
四、完整生产级实现
4.1 时间窗口计算
public long calculatettltonexthour() {
localdatetime now = localdatetime.now();
localdatetime nexthour = now.plushours(1).truncatedto(chronounit.hours);
return chronounit.seconds.between(now, nexthour);
}
4.2 kafka消费者集成
@component
@requiredargsconstructor
public class flowcounter {
private final redistemplate<string, string> redistemplate;
private static final string key_prefix = "flow:count:";
@kafkalistener(topics = "${kafka.topic}")
public void handlemessages(list<message> messages) {
map<string, integer> countmap = messages.stream()
.collect(collectors.tomap(
this::buildkey,
msg -> 1,
integer::sum
));
countmap.foreach((k, v) ->
incrementatomically(k, v, calculatettltonexthour())
);
}
private string buildkey(message msg) {
return string.format("%s%s:%s:%s:%s",
key_prefix,
msg.getappid(),
msg.getdeviceid(),
msg.getip(),
localdatetime.now().format(datetimeformatter.ofpattern("yyyymmddhh"))
);
}
}4.3 查询接口
public long getcurrentcount(string appid, string deviceid, string ip) {
string key = buildkey(appid, deviceid, ip);
string val = redistemplate.opsforvalue().get(key);
return val != null ? long.parselong(val) : 0l;
}
五、性能优化技巧
5.1 pipeline批量处理
redistemplate.executepipelined((rediscallback<object>) connection -> {
stringredisconnection conn = (stringredisconnection) connection;
countmap.foreach((k, v) -> {
conn.incrby(k, v);
// 可结合lua脚本进一步优化
});
return null;
});
5.2 本地预聚合
// 在内存中先合并相同key的计数
map<string, integer> localcount = messages.stream()
.collect(collectors.tomap(
this::buildkey,
m -> 1,
integer::sum
));
5.3 集群部署注意事项
使用{}强制哈希标签,保证相同key路由到同一节点
"{flow}:count:app123:..."
考虑分片策略避免热点
六、异常处理与监控
6.1 redis重试机制
@retryable(maxattempts = 3, backoff = @backoff(delay = 100))
public void safeincrement(string key, int delta) {
// 业务逻辑
}
6.2 监控指标
# type redis_operations_total counter
redis_operations_total{operation="incr"} 12345
redis_operations_total{operation="expire"} 678
6.3 数据补偿
@scheduled(fixedrate = 3600000)
public void checkdataconsistency() {
// 对比db与redis计数差异
}
七、方案对比总结
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| lua脚本 | 原子性强,性能最佳 | 需要redis 2.6+ | 新项目首选 |
| setnx+incr | 兼容旧版 | 有竞态风险 | 遗留系统 |
| 纯incr+ttl | 实现简单 | ttl冗余 | 不推荐生产 |
结语
通过本文的方案,我们实现了:
- 单机50k+ qps的计数能力
- 精确到小时的时间窗口控制
- 分布式环境下的强一致性
最佳实践建议:
- 生产环境优先选择lua脚本方案
- 对于超高并发场景(如双11),可增加本地缓存层
- 定期检查redis内存使用情况
以上就是高并发下redis精确计数与时间窗口过期的方法详解的详细内容,更多关于redis高并发精确计数的资料请关注代码网其它相关文章!
发表评论