滑动窗口算法是一种基于时间窗口的限流算法,它将时间划分为若干个固定大小的窗口,每个窗口内记录了该时间段内的请求次数。通过动态地滑动窗口,可以动态调整限流的速率,以应对不同的流量变化。
整个限流可以概括为两个主要步骤:
- 统计窗口内的请求数量
- 应用限流规则
redis有序集合每个value有一个score(分数),基于score我们可以定义一个时间窗口,然后每次一个请求进来就设置一个value,这样就可以统计窗口内的请求数量。key可以是资源名,比如一个url,或者ip+url,用户标识+url等。value在这里不那么重要,因为我们只需要统计数量,因此value可以就设置成时间戳,但是如果value相同的话就会被覆盖,所以我们可以把请求的数据做一个hash,将这个hash值当value,或者如果每个请求有流水号的话,可以用请求流水号当value,总之就是要能唯一标识一次请求的。
所以,简化后的命令就变成了:
zadd 资源标识 时间戳 请求标识
public boolean isallow(string key) { zsetoperations<string, string> zsetoperations = stringredistemplate.opsforzset(); // 获取当前时间戳 long currenttime = system.currenttimemillis(); // 当前时间 - 窗口大小 = 窗口开始时间 long windowstart = currenttime - period; // 删除窗口开始时间之前的所有数据 zsetoperations.removerangebyscore(key, 0, windowstart); // 统计窗口中请求数量 long count = zsetoperations.zcard(key); // 如果窗口中已经请求的数量超过阈值,则直接拒绝 if (count >= threshold) { return false; } // 没有超过阈值,则加入集合 string value = "请求唯一标识(比如:请求流水号、哈希值、md5值等)"; zsetoperations.add(key, string.valueof(currenttime), currenttime); // 设置一个过期时间,及时清理冷数据 stringredistemplate.expire(key, period, timeunit.milliseconds); // 通过 return true; }
上面代码中涉及到三条redis命令,并发请求下可能存在问题,所以我们把它们写成lua脚本
local key = keys[1] local current_time = tonumber(argv[1]) local window_size = tonumber(argv[2]) local threshold = tonumber(argv[3]) redis.call('zremrangebyscore', key, 0, current_time - window_size) local count = redis.call('zcard', key) if count >= threshold then return tostring(0) else redis.call('zadd', key, tostring(current_time), current_time) return tostring(1) end
完整的代码如下:
package com.example.demo.controller; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.stringredistemplate; import org.springframework.data.redis.core.zsetoperations; import org.springframework.data.redis.core.script.defaultredisscript; import org.springframework.stereotype.service; import java.util.collections; import java.util.concurrent.timeunit; /** * 基于redis有序集合实现滑动窗口限流 * @author: chengjiansheng * @date: 2024/12/26 */ @service public class slidingwindowratelimiter { private long period = 60*1000; // 1分钟 private int threshold = 3; // 3次 @autowired private stringredistemplate stringredistemplate; /** * redistemplate */ public boolean isallow(string key) { zsetoperations<string, string> zsetoperations = stringredistemplate.opsforzset(); // 获取当前时间戳 long currenttime = system.currenttimemillis(); // 当前时间 - 窗口大小 = 窗口开始时间 long windowstart = currenttime - period; // 删除窗口开始时间之前的所有数据 zsetoperations.removerangebyscore(key, 0, windowstart); // 统计窗口中请求数量 long count = zsetoperations.zcard(key); // 如果窗口中已经请求的数量超过阈值,则直接拒绝 if (count >= threshold) { return false; } // 没有超过阈值,则加入集合 string value = "请求唯一标识(比如:请求流水号、哈希值、md5值等)"; zsetoperations.add(key, string.valueof(currenttime), currenttime); // 设置一个过期时间,及时清理冷数据 stringredistemplate.expire(key, period, timeunit.milliseconds); // 通过 return true; } /** * lua脚本 */ public boolean isallow2(string key) { string luascript = "local key = keys[1]\n" + "local current_time = tonumber(argv[1])\n" + "local window_size = tonumber(argv[2])\n" + "local threshold = tonumber(argv[3])\n" + "redis.call('zremrangebyscore', key, 0, current_time - window_size)\n" + "local count = redis.call('zcard', key)\n" + "if count >= threshold then\n" + " return tostring(0)\n" + "else\n" + " redis.call('zadd', key, tostring(current_time), current_time)\n" + " return tostring(1)\n" + "end"; long currenttime = system.currenttimemillis(); defaultredisscript<string> redisscript = new defaultredisscript<>(luascript, string.class); string result = stringredistemplate.execute(redisscript, collections.singletonlist(key), string.valueof(currenttime), string.valueof(period), string.valueof(threshold)); // 返回1表示通过,返回0表示拒绝 return "1".equals(result); } }
这里用stringredistemplate执行lua脚本,先把lua脚本封装成defaultredisscript对象。注意,千万注意,lua脚本的返回值必须是字符串,参数也最好都是字符串,用整型的话可能类型转换错误。
string requestid = uuid.randomuuid().tostring(); defaultredisscript<string> redisscript = new defaultredisscript<>(luascript, string.class); string result = stringredistemplate.execute(redisscript, collections.singletonlist(key), requestid, string.valueof(period), string.valueof(threshold));
好了,上面就是基于redis有序集合实现的滑动窗口限流。顺带提一句,redis list类型也可以用来实现滑动窗口。
接下来,我们来完善一下上面的代码,通过aop来拦截请求达到限流的目的
为此,我们必须自定义注解,然后根据注解参数,来个性化的控制限流。那么,问题来了,如果获取注解参数呢?
举例说明:
@retention(retentionpolicy.runtime) @target(elementtype.method) public @interface myannotation { string value(); } @aspect @component public class myaspect { @before("@annotation(myannotation)") public void beforemethod(joinpoint joinpoint, myannotation myannotation) { // 获取注解参数 string value = myannotation.value(); system.out.println("annotation value: " + value); // 其他业务逻辑... } }
注意看,切点是怎么写的 @before("@annotation(myannotation)")
是@before("@annotation(myannotation)"),而不是@before("@annotation(myannotation)")
myannotation,是参数,而myannotation则是注解类
此处参考资料
https://www.cnblogs.com/javaxubo/p/16556924.html
言归正传,我们首先定义一个注解
package com.example.demo.controller; import java.lang.annotation.*; /** * 请求速率限制 * @author: chengjiansheng * @date: 2024/12/26 */ @documented @target(elementtype.method) @retention(retentionpolicy.runtime) public @interface ratelimit { /** * 窗口大小(默认:60秒) */ long period() default 60; /** * 阈值(默认:3次) */ long threshold() default 3; }
定义切面
package com.example.demo.controller; import jakarta.servlet.http.httpservletrequest; import lombok.extern.slf4j.slf4j; import org.aspectj.lang.joinpoint; import org.aspectj.lang.annotation.aspect; import org.aspectj.lang.annotation.before; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.stringredistemplate; import org.springframework.data.redis.core.zsetoperations; import org.springframework.stereotype.component; import org.springframework.web.context.request.requestcontextholder; import org.springframework.web.context.request.servletrequestattributes; import org.springframework.web.servlet.support.requestcontextutils; import java.util.concurrent.timeunit; /** * @author: chengjiansheng * @date: 2024/12/26 */ @slf4j @aspect @component public class ratelimitaspect { @autowired private stringredistemplate stringredistemplate; // @autowired // private slidingwindowratelimiter slidingwindowratelimiter; @before("@annotation(ratelimit)") public void dobefore(joinpoint joinpoint, ratelimit ratelimit) { // 获取注解参数 long period = ratelimit.period(); long threshold = ratelimit.threshold(); // 获取请求信息 servletrequestattributes servletrequestattributes = (servletrequestattributes) requestcontextholder.getrequestattributes(); httpservletrequest httpservletrequest = servletrequestattributes.getrequest(); string uri = httpservletrequest.getrequesturi(); long userid = 123l; // 模拟获取用户id string key = "limit:" + userid + ":" + uri; /* if (!slidingwindowratelimiter.isallow2(key)) { log.warn("请求超过速率限制!userid={}, uri={}", userid, uri); throw new runtimeexception("请求过于频繁!"); }*/ zsetoperations<string, string> zsetoperations = stringredistemplate.opsforzset(); // 获取当前时间戳 long currenttime = system.currenttimemillis(); // 当前时间 - 窗口大小 = 窗口开始时间 long windowstart = currenttime - period * 1000; // 删除窗口开始时间之前的所有数据 zsetoperations.removerangebyscore(key, 0, windowstart); // 统计窗口中请求数量 long count = zsetoperations.zcard(key); // 如果窗口中已经请求的数量超过阈值,则直接拒绝 if (count < threshold) { // 没有超过阈值,则加入集合 zsetoperations.add(key, string.valueof(currenttime), currenttime); // 设置一个过期时间,及时清理冷数据 stringredistemplate.expire(key, period, timeunit.seconds); } else { throw new runtimeexception("请求过于频繁!"); } } }
加注解
@restcontroller @requestmapping("/hello") public class hellocontroller { @ratelimit(period = 30, threshold = 2) @getmapping("/sayhi") public void sayhi() { } }
最后,看redis中的数据结构
最后的最后,流量控制建议看看阿里巴巴 sentinel
https://sentinelguard.io/zh-cn/
到此这篇关于基于redis有序集合实现滑动窗口限流的文章就介绍到这了,更多相关基于redis有序集合实现滑动窗口限流内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论