为什么使用redis+lua实现分布式限流
- 原子性:通过lua脚本执行限流逻辑,所有操作在一个原子上下文中完成,避免了多步操作导致的并发问题。
- 灵活性:lua脚本可以编写复杂的逻辑,比如滑动窗口限流,易于扩展和定制化。
- 性能:由于所有逻辑在redis服务器端执行,减少了网络往返,提高了执行效率。
使用zset也可以实现限流,为什么选择lua的方式
使用zset需要额度解决这些问题
- 并发控制:需要额外的逻辑来保证操作的原子性和准确性,可能需要配合lua脚本或lua脚本+watch/multi/exec模式来实现。
- 资源消耗:长期存储请求记录可能导致redis占用更多的内存资源。
为什么redis+zset不能保证原子性和准确性
- 多步骤操作:滑动窗口限流通常需要执行多个步骤,比如检查当前窗口的请求次数、添加新的请求记录、可能还需要删除过期的请求记录等。这些操作如果分开执行,就有可能在多线程或多进程环境下出现不一致的情况。
- 非原子性复合操作:虽然单个redis命令是原子的,但当你需要执行一系列操作来维持限流状态时(例如,先检查计数、再增加计数、最后可能还要删除旧记录),没有一个单一的redis命令能完成这些复合操作。如果在这系列操作之间有其他客户端修改了数据,就会导致限流不准确。
- 竞争条件:在高并发环境下,多个客户端可能几乎同时执行限流检查和增加请求的操作,如果没有适当的同步机制,可能会导致请求计数错误。
实现
依赖
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.2.6.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <groupid>com.kang</groupid> <artifactid>rate-limiter-project</artifactid> <version>0.0.1-snapshot</version> <name>rate-limiter-project</name> <description>rate-limiter-project</description> <properties> <java.version>8</java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-pool2</artifactid> <version>2.6.2</version> </dependency> <dependency> <groupid>com.google.guava</groupid> <artifactid>guava</artifactid> <version>31.0.1-jre</version> <!-- 请检查最新版本 --> </dependency> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-lang3</artifactid> <version>3.12.0</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <optional>true</optional> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> <configuration> <excludes> <exclude> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
lua脚本
-- keys[1] 是redis中存储计数的key,,, local key = keys[1] -- argv[1]是当前时间戳-[当前时间戳] local now = tonumber(argv[1]) -- argv[2]是最大请求次数-[最大请求次数] local maxrequests = tonumber(argv[2]) -- argv[3]是时间窗口长度-[时间窗口长度] local windowsize = tonumber(argv[3]) -- 获取当前时间窗口的起始时间 local windowstart = math.floor(now / windowsize) * windowsize -- 构建时间窗口内的key,用于区分不同窗口的计数 local windowkey = key .. ':' .. tostring(windowstart) -- 获取当前窗口的计数 local currentcount = tonumber(redis.call('get', windowkey) or '0') -- 如果当前时间不在窗口内,重置计数 if now > windowstart + windowsize then redis.call('del', windowkey) currentcount = 0 end -- 检查是否超过限制 if currentcount + 1 <= maxrequests then -- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求 redis.call('set', windowkey, currentcount + 1, 'ex', windowsize - (now - windowstart)) return 1 -- 成功 else return 0 -- 失败 end
yaml
server: port: 10086 spring: redis: host: 127.0.0.1 port: 6379 database: 0 lettuce: pool: max-active: 20 max-idle: 10 min-idle: 5
代码实现
启动类
package com.kang.limter; import lombok.extern.slf4j.slf4j; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; @slf4j @springbootapplication public class ratelimiterprojectapplication { public static void main(string[] args) { springapplication.run(ratelimiterprojectapplication.class, args); log.info("ratelimiterprojectapplication start success"); } }
cacheconfig
package com.kang.limter.cache; import com.google.common.cache.cachebuilder; import com.google.common.cache.cacheloader; import com.google.common.cache.loadingcache; import com.kang.limter.utils.luascriptutils; import lombok.extern.slf4j.slf4j; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import java.util.collections; import java.util.list; import java.util.concurrent.timeunit; import static com.kang.limter.constant.systemconstant.redis_rate_limiter_lua_script_path; /** * @author emperor kang * @classname cacheconfig * @description 缓存配置 * @date 2024/6/13 10:07 * @version 1.0 * @motto 让营地比你来时更干净 */ @slf4j @configuration public class cacheconfig { /** * 缓存配置,加载lua脚本 * @return */ @bean(name = "ratelimiterluacache") public loadingcache<string, string> ratelimiterluacache() { loadingcache<string, string> cache = cachebuilder.newbuilder() // 设置缓存的最大容量,最多100个键值对 .maximumsize(100) // 设置缓存项过期策略:写入后2小时过期 .expireafterwrite(2, timeunit.hours) // 缓存统计信息记录 .recordstats() // 构建缓存加载器,用于加载缓存项的值 .build(new cacheloader<string, string>() { @override public string load(string scriptpath) throws exception { try { return luascriptutils.loadluascript(scriptpath); } catch (exception e) { log.error("加载lua脚本失败:{}", e.getmessage()); return null; } } }); // 预热缓存 warmupcache(cache); return cache; } /** * 预热缓存 */ private void warmupcache(loadingcache<string, string> cache) { try { // 假设我们有一个已知的脚本列表需要预热 list<string> knownscripts = collections.singletonlist(redis_rate_limiter_lua_script_path); for (string script : knownscripts) { string luascript = luascriptutils.loadluascript(script); // 手动初始化缓存 cache.put(script, luascript); log.info("预加载lua脚本成功: {}, length: {}", script, luascript.length()); } } catch (exception e) { log.error("预加载lua脚本失败: {}", e.getmessage(), e); } } }
- 这里使用缓存预热加快lua脚本的加载速度,基于jvm内存操作,所以很快
systemconstant
package com.kang.limter.constant; /** * @author emperor kang * @classname systemconstant * @description 系统常量 * @date 2024/6/12 19:25 * @version 1.0 * @motto 让营地比你来时更干净 */ public class systemconstant { /** * 限流配置缓存key前缀 */ public static final string redis_rate_limiter_key_prefix = "outreach:config:limiter:%s"; /** * 限流lua脚本路径 */ public static final string redis_rate_limiter_lua_script_path = "classpath:lua/rate_limiter.lua"; }
ratelimitercontroller
package com.kang.limter.controller; import com.kang.limter.dto.ratelimiterrequestdto; import com.kang.limter.utils.ratelimiterutil; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.postmapping; import org.springframework.web.bind.annotation.requestbody; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.restcontroller; import static java.lang.thread.sleep; /** * @author emperor kang * @classname ratelimitercontroller * @description todo * @date 2024/6/12 19:33 * @version 1.0 * @motto 让营地比你来时更干净 */ @slf4j @restcontroller @requestmapping("/rate/limiter") public class ratelimitercontroller { @autowired private ratelimiterutil ratelimiterutil; @postmapping("/test") public string test(@requestbody ratelimiterrequestdto ratelimiterrequestdto) { // 是否限流 if (!ratelimiterutil.tryacquire(ratelimiterrequestdto.getinterfacecode(), 5, 1000)) { log.info("触发限流策略,interfacecode:{}", ratelimiterrequestdto.getinterfacecode()); return "我被限流了interfacecode:" + ratelimiterrequestdto.getinterfacecode(); } log.info("请求参数:{}", ratelimiterrequestdto); try { log.info("开始加工逻辑"); sleep(1000); } catch (interruptedexception e) { log.error("休眠异常"); thread.currentthread().interrupt(); return "加工异常"; } return "加工成功,成功返回"; } }
ratelimiterrequestdto
package com.kang.limter.dto; import lombok.data; /** * @author emperor kang * @classname ratelimiterrequestdto * @description todo * @date 2024/6/12 19:39 * @version 1.0 * @motto 让营地比你来时更干净 */ @data public class ratelimiterrequestdto { /** * 接口编码 */ private string interfacecode; }
resourceloaderexception
package com.kang.limter.exception; /** * @author emperor kang * @classname resourceloaderexception * @description 自定义资源加载异常 * @date 2024/6/12 18:10 * @version 1.0 * @motto 让营地比你来时更干净 */ public class resourceloaderexception extends exception{ public resourceloaderexception() { super(); } public resourceloaderexception(string message) { super(message); } public resourceloaderexception(string message, throwable cause) { super(message, cause); } public resourceloaderexception(throwable cause) { super(cause); } protected resourceloaderexception(string message, throwable cause, boolean enablesuppression, boolean writablestacktrace) { super(message, cause, enablesuppression, writablestacktrace); } }
luascriptutils
package com.kang.limter.utils; import com.kang.limter.exception.resourceloaderexception; import lombok.extern.slf4j.slf4j; import org.springframework.core.io.defaultresourceloader; import org.springframework.core.io.resource; import org.springframework.core.io.resourceloader; import org.springframework.util.assert; import java.io.bufferedreader; import java.io.inputstreamreader; import java.nio.charset.standardcharsets; @slf4j public class luascriptutils { /** * 从类路径下读取lua脚本内容。 * @param scriptpath 类路径下的lua脚本文件路径 * @return lua脚本的文本内容 */ public static string loadluascript(string scriptpath) throws resourceloaderexception { assert.notnull(scriptpath, "script path must not be null"); try { // 读取lua脚本 resourceloader resourceloader = new defaultresourceloader(); resource resource = resourceloader.getresource(scriptpath); try (bufferedreader reader = new bufferedreader(new inputstreamreader(resource.getinputstream(), standardcharsets.utf_8))) { stringbuilder scriptbuilder = new stringbuilder(); string line; while ((line = reader.readline()) != null) { scriptbuilder.append(line).append("\n"); } string lua = scriptbuilder.tostring(); log.debug("读取的lua脚本为: {}", lua); return lua; } } catch (exception e) { log.error("failed to load lua script from path: {}", scriptpath, e); throw new resourceloaderexception("failed to load lua script from path: " + scriptpath, e); } } }
ratelimiterutil
package com.kang.limter.utils; import com.google.common.cache.loadingcache; import com.kang.limter.exception.resourceloaderexception; import lombok.extern.slf4j.slf4j; import org.apache.commons.lang3.stringutils; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.data.redis.connection.returntype; import org.springframework.data.redis.core.rediscallback; import org.springframework.data.redis.core.stringredistemplate; import org.springframework.stereotype.component; import org.springframework.util.assert; import java.nio.charset.standardcharsets; import static com.kang.limter.constant.systemconstant.redis_rate_limiter_key_prefix; import static com.kang.limter.constant.systemconstant.redis_rate_limiter_lua_script_path; /** * @author emperor kang * @classname ratelimiterutil * @description 限流工具类 * @date 2024/6/12 17:56 * @version 1.0 * @motto 让营地比你来时更干净 */ @slf4j @component public class ratelimiterutil { @autowired private stringredistemplate redistemplate; @autowired @qualifier("ratelimiterluacache") private loadingcache<string, string> ratelimiterluacache; /** * @param interfacecode 接口标识 * @param maxrequests 最大请求数 * @param windowsizems 窗口大小 * @return boolean * @description 尝试获取令牌 * @author emperor kang * @date 2024/6/12 17:57 * @version 1.0 */ public boolean tryacquire(string interfacecode, int maxrequests, long windowsizems) { try { long currenttimemillis = system.currenttimemillis(); string luascript = ratelimiterluacache.get(redis_rate_limiter_lua_script_path); log.info("缓存查询lua,length={}", luascript.length()); if(stringutils.isblank(luascript)){ log.info("从缓存中未获取到lua脚本,尝试手动读取"); luascript = luascriptutils.loadluascript(redis_rate_limiter_lua_script_path); } // 二次确认 if(stringutils.isblank(luascript)){ log.info("lua脚本加载失败,暂时放弃获取许可,不再限流"); return true; } // 限流核心逻辑 string finalluascript = luascript; long result = redistemplate.execute((rediscallback<long>) connection -> { // 用于存储的key byte[] key = string.format(redis_rate_limiter_key_prefix, interfacecode).getbytes(standardcharsets.utf_8); // 当前时间(毫秒) byte[] now = string.valueof(currenttimemillis).getbytes(standardcharsets.utf_8); // 最大请求数 byte[] maxrequestsbytes = string.valueof(maxrequests).getbytes(standardcharsets.utf_8); // 窗口大小 byte[] windowsizebytes = string.valueof(windowsizems).getbytes(standardcharsets.utf_8); // 执行lua脚本 return connection.eval(finalluascript.getbytes(standardcharsets.utf_8), returntype.integer, 1, key, now, maxrequestsbytes, windowsizebytes); }); assert.notnull(result, "执行lua脚本响应结果为null"); // 获取结果 return result == 1l; } catch (resourceloaderexception e) { log.error("加载lua脚本失败", e); } catch (exception e){ log.error("执行限流逻辑异常", e); } return true; } }
lua脚本
-- keys[1] 是redis中存储计数的key,,, local key = keys[1] -- argv[1]是当前时间戳-[当前时间戳] local now = tonumber(argv[1]) -- argv[2]是最大请求次数-[最大请求次数] local maxrequests = tonumber(argv[2]) -- argv[3]是时间窗口长度-[时间窗口长度] local windowsize = tonumber(argv[3]) -- 获取当前时间窗口的起始时间 local windowstart = math.floor(now / windowsize) * windowsize -- 构建时间窗口内的key,用于区分不同窗口的计数 local windowkey = key .. ':' .. tostring(windowstart) -- 获取当前窗口的计数 local currentcount = tonumber(redis.call('get', windowkey) or '0') -- 如果当前时间不在窗口内,重置计数 if now > windowstart + windowsize then redis.call('del', windowkey) currentcount = 0 end -- 检查是否超过限制 if currentcount + 1 <= maxrequests then -- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求 redis.call('set', windowkey, currentcount + 1, 'ex', windowsize - (now - windowstart)) return 1 -- 成功 else return 0 -- 失败 end
jmeter压测
200次请求/s,限流了195,而我们设置的最大令牌数就是5
到此这篇关于redis+lua实现分布式限流的示例的文章就介绍到这了,更多相关redis+lua分布式限流内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论