需求背景:
限制某sql在30秒内最多只能执行3次
需求分析
微服务分布式部署,既然是分布式限流,首先自然就想到了结合redis的zset数据结构来实现。
分析对zset的操作,有几个步骤,首先,判断zset中符合rangescore的元素个数是否已经达到阈值,如果未达到阈值,则add元素,并返回true。如果已达到阈值,则直接返回false。
代码实现
首先,我们需要根据需求编写一个lua脚本
redis.call('zremrangebyscore', keys[1], 0, tonumber(argv[3])) local res = 0 if(redis.call('zcard', keys[1]) < tonumber(argv[5])) then redis.call('zadd', keys[1], tonumber(argv[2]), argv[1]) res = 1 end redis.call('expire', keys[1], tonumber(argv[4])) return res
argv[1]: zset element
argv[2]: zset score(当前时间戳)
argv[3]: 30秒前的时间戳
argv[4]: zset key 过期时间30秒
argv[5]: 限流阈值
private final redistemplate<string, object> redistemplate; public boolean execluascript(string luastr, list<string> keys, list<object> args){ redisscript<boolean> redisscript = redisscript.of(luastr, boolean.class) return redistemplate.execute(redisscript, keys, args.toarray()); }
测试一下效果
@springboottest public class apiapplicationtest { @test public void test2() throws interruptedexception{ string luastr = "redis.call('zremrangebyscore', keys[1], 0, tonumber(argv[3]))\n" + "local res = 0\n" + "if(redis.call('zcard', keys[1]) < tonumber(argv[5])) then\n" + " redis.call('zadd', keys[1], tonumber(argv[2]), argv[1])\n" + " res = 1\n" + "end\n" + "redis.call('expire', keys[1], tonumber(argv[4]))\n" + "return res"; for (int i = 0; i < 10; i++) { boolean res = execluascript(luastr, arrays.aslist("aaaa"), arrays.aslist("ele"+i, system.currenttimemillis(),system.currenttimemillis()-30*1000, 30, 3)); system.out.println(res); thread.sleep(5000); } } }
测试结果符合预期!
扩展阅读
lua脚本每次都需要传一长串脚本内容来回传输,会增加网络流量和延迟,而且每次都需要服务器重新解释和编译,效率较为低下。因此,不建议在实际生产环境中直接执行lua脚本,而应该使用lua脚本的hash值来进行传输。
为了方便使用,我们先把方法封装一下
import lombok.requiredargsconstructor; import org.springframework.data.redis.connection.redisscriptingcommands; import org.springframework.data.redis.connection.returntype; import org.springframework.data.redis.core.redistemplate; import org.springframework.data.redis.serializer.redisserializer; import org.springframework.stereotype.component; import java.util.list; /** * @author 敖癸 * @formatter:on * @since 2024/3/25 */ @component @requiredargsconstructor public class redisservice { private final redistemplate<string, object> redistemplate; private static redisscriptingcommands commands; private static redisserializer keyserializer; private static redisserializer valserializer; public string loadscript(string luastr) { byte[] bytes = redisserializer.string().serialize(luastr); return this.getcommands().scriptload(bytes); } public <t> t execluahashscript(string hash, class<t> returntype, list<string> keys, object[] args) { byte[][] keysandargs = tobytearray(this.getkeyserializer(), this.getvalserializer(), keys, args); return this.getcommands().evalsha(hash, returntype.fromjavatype(returntype), keys.size(), keysandargs); } private static byte[][] tobytearray(redisserializer keyserializer, redisserializer argsserializer, list<string> keys, object[] args) { final int keysize = keys != null ? keys.size() : 0; byte[][] keysandargs = new byte[args.length + keysize][]; int i = 0; if (keys != null) { for (string key : keys) { keysandargs[i++] = keyserializer.serialize(key); } } for (object arg : args) { if (arg instanceof byte[]) { keysandargs[i++] = (byte[]) arg; } else { keysandargs[i++] = argsserializer.serialize(arg); } } return keysandargs; } private redisscriptingcommands getcommands() { if (commands == null) { commands = redistemplate.getrequiredconnectionfactory().getconnection().scriptingcommands(); } return commands; } private redisserializer getkeyserializer() { if (keyserializer == null) { keyserializer = redistemplate.getkeyserializer(); } return keyserializer; } private redisserializer getvalserializer() { if (valserializer == null) { valserializer = redistemplate.getvalueserializer(); } return valserializer; } }
- 测试一下:
@springboottest @testinstance(testinstance.lifecycle.per_class) public class apiapplicationtest implements applicationcontextaware { private static applicationcontext context; private static redisservice redisservice; public static string luahash; private final static string lua_str = "redis.call('zremrangebyscore', keys[1], 0, tonumber(argv[3]))\n" + "local res = 0\n" + "if(redis.call('zcard', keys[1]) < tonumber(argv[5])) then\n" + " redis.call('zadd', keys[1], tonumber(argv[2]), argv[1])\n" + " res = 1\n" + "end\n" + "redis.call('expire', keys[1], tonumber(argv[4]))\n" + "return res"; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { context = applicationcontext; } @beforeall public static void before(){ redisservice = context.getbean(redisservice.class); luahash = redisservice.loadscript(lua_str); system.out.println("lua脚本hash: "+ luahash); } @test public void testluahash() throws interruptedexception { for (int i = 0; i < 50; i++) { list<string> keys = collections.singletonlist("aaaa"); object[] args = new object[]{"ele" + i, system.currenttimemillis(), system.currenttimemillis() - 30 * 1000, 30, 3}; boolean b = redisservice.execluahashscript(luahash, boolean.class, keys, args); system.out.println(b); thread.sleep(3000); } } }
使用的时候在项目启动时候,把脚本load一下,后续直接用hash值就行了
搞定收工!
以上就是利用redis lua脚本实现时间窗分布式限流的详细内容,更多关于redis lua时间窗分布式限流的资料请关注代码网其它相关文章!
发表评论