一、限流
思考:为啥需要限流?
在一个流量特别大的业务场景中,如果不进行限流,会造成系统宕机,当大批量的请求到达后端服务时,会造成资源耗尽【cpu、内存、线程、网络带宽、数据库连接等是有限的】,进而拖垮系统。
1.常见限流算法
- 漏桶算法
- 令牌桶算法
1.1漏桶算法(不推荐)
1.1.1.原理
将请求缓存到一个队列中,然后以固定的速度处理,从而达到限流的目的
1.1.2.实现
将请求装到一个桶中,桶的容量为固定的一个值,当桶装满之后,就会将请求丢弃掉,桶底部有一个洞,以固定的速率流出。
1.1.3.举例
桶的容量为1w,有10w并发请求,最多只能将1w请求放入桶中,其余请求全部丢弃,以固定的速度处理请求
1.1.4.缺点
处理突发流量效率低(处理请求的速度不变,效率很低)
1.2.令牌桶算法(推荐)
1.2.1.原理
将请求放在一个缓冲队列中,拿到令牌后才能进行处理
1.2.2.实现
装令牌的桶大小固定,当令牌装满后,则不能将令牌放入其中;每次请求都会到桶中拿取一个令牌才能放行,没有令牌时即丢弃请求/继续放入缓存队列中等待
1.2.3.举例
桶的容量为10w个,生产1w个/s,有10w的并发请求,以每秒10w个/s速度处理,随着桶中的令牌很快用完,速度又慢慢降下来啦,而生产令牌的速度趋于一致1w个/s
1.2.4.缺点
处理突发流量提供了系统性能,但是对系统造成了一定的压力,桶的大小不合理,甚至会压垮系统(处理1亿的并发请求,将桶的大小设置为1,这个系统一下就凉凉啦)
2.网关限流(spring cloud gateway + redis实战)
2.1.pom.xml配置
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis-reactive</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-gateway</artifactid> <exclusions> <exclusion> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.httpcomponents</groupid> <artifactid>httpclient</artifactid> </dependency>
2.2.yaml配置
spring: application: name: laokou-gateway cloud: gateway: routes: - id: laokou-sso-demo uri: lb://laokou-sso-demo predicates: - path=/sso/** filters: - stripprefix=1 - name: requestratelimiter #请求数限流,名字不能乱打 args: key-resolver: "#{@ipkeyresolver}" redis-rate-limiter.replenishrate: 1 #生成令牌速率-设为1方便测试 redis-rate-limiter.burstcapacity: 1 #令牌桶容量-设置1方便测试 redis: database: 0 cluster: nodes: x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005,x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005 password: laokou #密码 timeout: 6000ms #连接超时时长(毫秒) jedis: pool: max-active: -1 #连接池最大连接数(使用负值表示无极限) max-wait: -1ms #连接池最大阻塞等待时间(使用负值表示没有限制) max-idle: 10 #连接池最大空闲连接 min-idle: 5 #连接池最小空间连接
2.3.创建bean
@configuration public class requestratelimiterconfig { @bean(value = "ipkeyresolver") public keyresolver ipkeyresolver(remoteaddressresolver remoteaddressresolver) { return exchange -> mono.just(remoteaddressresolver.resolve(exchange).getaddress().gethostaddress()); } @bean public remoteaddressresolver remoteaddressresolver() { // 远程地址解析器 return xforwardedremoteaddressresolver.trustall(); } }
3.测试限流(编写java并发测试)
@slf4j public class httputil { public static void apiconcurrent(string url,map<string,string> params) { integer count = 200; //创建线程池 threadpoolexecutor pool = new threadpoolexecutor(5, 200, 0l, timeunit.seconds, new synchronousqueue<>()); //同步工具 countdownlatch latch = new countdownlatch(count); map<string,string> datamap = new hashmap<>(1); datamap.put("authorize","xxxxxxx"); for (int i = 0; i < count; i++) { pool.execute(() -> { try { //访问网关的api接口 httputil.doget("http://localhost:1234/sso/laokou-demo/user",datamap); } catch (ioexception e) { e.printstacktrace(); }finally { latch.countdown(); } }); } try { latch.await(); } catch (interruptedexception e) { e.printstacktrace(); } } public static string doget(string url, map<string, string> params) throws ioexception { //创建httpclient对象 closeablehttpclient httpclient = httpclients.createdefault(); string resultstring = ""; closeablehttpresponse response = null; try { //创建uri uribuilder builder = new uribuilder(url); if (!params.isempty()) { for (map.entry<string, string> entry : params.entryset()) { builder.addparameter(entry.getkey(), entry.getvalue()); } } uri uri = builder.build(); //创建http get请求 httpget httpget = new httpget(uri); list<namevaluepair> paramlist = new arraylist<>(); requestbuilder requestbuilder = requestbuilder.get().seturi(new uri(url)); requestbuilder.setentity(new urlencodedformentity(paramlist, consts.utf_8)); httpget.setheader(new basicheader("content-type", "application/json;charset=utf-8")); httpget.setheader(new basicheader("accept", "*/*;charset=utf-8")); //执行请求 response = httpclient.execute(httpget); //判断返回状态是否是200 if (response.getstatusline().getstatuscode() == 200) { resultstring = entityutils.tostring(response.getentity(), "utf-8"); } } catch (exception e) { log.info("调用失败:{}",e); } finally { if (response != null) { response.close(); } httpclient.close(); } log.info("打印:{}",resultstring); return resultstring; } }
说明这个网关限流配置是没有问题的
4.源码查看
spring cloud gateway requestratelimiter gatewayfilter factory文档地址
工厂 requestratelimiter gatewayfilter
使用一个ratelimiter
实现来判断当前请求是否被允许继续。如果不允许,http 429 - too many requests
则返回默认状态。
4.1.查看 requestratelimitergatewayfilterfactory
@override public gatewayfilter apply(config config) { keyresolver resolver = getordefault(config.keyresolver, defaultkeyresolver); ratelimiter<object> limiter = getordefault(config.ratelimiter, defaultratelimiter); boolean denyempty = getordefault(config.denyemptykey, this.denyemptykey); httpstatusholder emptykeystatus = httpstatusholder .parse(getordefault(config.emptykeystatus, this.emptykeystatuscode)); return (exchange, chain) -> resolver.resolve(exchange).defaultifempty(empty_key).flatmap(key -> { if (empty_key.equals(key)) { if (denyempty) { setresponsestatus(exchange, emptykeystatus); return exchange.getresponse().setcomplete(); } return chain.filter(exchange); } string routeid = config.getrouteid(); if (routeid == null) { route route = exchange.getattribute(serverwebexchangeutils.gateway_route_attr); routeid = route.getid(); } // 执行限流 return limiter.isallowed(routeid, key).flatmap(response -> { for (map.entry<string, string> header : response.getheaders().entryset()) { exchange.getresponse().getheaders().add(header.getkey(), header.getvalue()); } if (response.isallowed()) { return chain.filter(exchange); } setresponsestatus(exchange, config.getstatuscode()); return exchange.getresponse().setcomplete(); }); }); }
4.2.查看 redisratelimiter
@override @suppresswarnings("unchecked") public mono<response> isallowed(string routeid, string id) { if (!this.initialized.get()) { throw new illegalstateexception("redisratelimiter is not initialized"); } // 这里如何加载配置?请思考 config routeconfig = loadconfiguration(routeid); // 令牌桶每秒产生令牌数量 int replenishrate = routeconfig.getreplenishrate(); // 令牌桶容量 int burstcapacity = routeconfig.getburstcapacity(); // 请求消耗的令牌数 int requestedtokens = routeconfig.getrequestedtokens(); try { // 键 list<string> keys = getkeys(id); // 参数 list<string> scriptargs = arrays.aslist(replenishrate + "", burstcapacity + "", "", requestedtokens + ""); // 调用lua脚本 flux<list<long>> flux = this.redistemplate.execute(this.script, keys, scriptargs); return flux.onerrorresume(throwable -> { if (log.isdebugenabled()) { log.debug("error calling rate limiter lua", throwable); } return flux.just(arrays.aslist(1l, -1l)); }).reduce(new arraylist<long>(), (longs, l) -> { longs.addall(l); return longs; }).map(results -> { // 判断是否等于1,1表示允许通过,0表示不允许通过 boolean allowed = results.get(0) == 1l; long tokensleft = results.get(1); response response = new response(allowed, getheaders(routeconfig, tokensleft)); if (log.isdebugenabled()) { log.debug("response: " + response); } return response; }); } catch (exception e) { log.error("error determining if user allowed from redis", e); } return mono.just(new response(true, getheaders(routeconfig, -1l))); } static list<string> getkeys(string id) { string prefix = "request_rate_limiter.{" + id; string tokenkey = prefix + "}.tokens"; string timestampkey = prefix + "}.timestamp"; return arrays.aslist(tokenkey, timestampkey); }
思考:redis限流配置是如何加载?
其实就是监听动态路由的事件并把配置存起来
4.3.重点来了,令牌桶 /meta-inf/scripts/request_rate_limiter.lua 脚本剖析
-- user request rate limiter filter -- see https://stripe.com/blog/rate-limiters -- see https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-l11-l34 -- 令牌桶算法工作原理 -- 1.系统以恒定速率往桶里面放入令牌 -- 2.请求需要被处理,则需要从桶里面获取一个令牌 -- 3.如果桶里面没有令牌可获取,则可以选择等待或直接拒绝并返回 -- 令牌桶算法工作流程 -- 1.计算填满令牌桶所需要的时间(填充时间 = 桶容量 / 速率) -- 2.设置存储数据的ttl(过期时间),为填充时间的两倍(存储时间 = 填充时间 * 2) -- 3.从redis获取当前令牌的剩余数量和上一次调用的时间戳 -- 4.计算距离上一次调用的时间间隔(时间间隔 = 当前时间 - 上一次调用时间) -- 5.计算填充的令牌数量(填充令牌数量 = 时间间隔 * 速率)【前提:桶容量是固定的,不存在无限制的填充】 -- 6.判断是否有足够多的令牌满足请求【 (填充令牌数量 + 剩余令牌数量) >= 请求数量 && (填充令牌数量 + 剩余令牌数量) <= 桶容量 】 -- 7.如果请求被允许,则从桶里面取出相应数据的令牌 -- 8.如果ttl为正,则更新redis键中的令牌和时间戳 -- 9.返回两个两个参数(allowed_num:请求被允许标志。1允许,0不允许)、(new_tokens:填充令牌后剩余的令牌数据) -- 随机写入 redis.replicate_commands() -- 令牌桶key -> 存储当前可用令牌的数量(剩余令牌数量) local tokens_key = keys[1] -- 时间戳key -> 存储上次令牌刷新的时间戳 local timestamp_key = keys[2] -- 令牌填充速率 local rate = tonumber(argv[1]) -- 令牌桶容量 local capacity = tonumber(argv[2]) -- 当前时间 local now = tonumber(argv[3]) -- 请求数量 local requested = tonumber(argv[4]) -- 填满令牌桶所需要的时间 local fill_time = capacity / rate -- 设置key的过期时间(填满令牌桶所需时间的2倍) local ttl = math.floor(fill_time * 2) -- 判断当前时间,为空则从redis获取 if now == nil then now = redis.call('time')[1] end -- 获取当前令牌的剩余数量 local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end -- 获取上一次调用的时间戳 local last_refreshed = tonumber(redis.call('get', timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end -- 计算距离上一次调用的时间间隔 local delta = math.max(0, now - last_refreshed) -- 当前的令牌数量(剩余 + 填充 <= 桶容量) local now_tokens = math.min(capacity, last_refreshed + (rate * delta)) -- 判断是否有足够多的令牌满足请求 local allowed = now_tokens >= requested -- 定义当前令牌的剩余数量 local new_tokens = now_tokens -- 定义被允许标志 local allowed_num = 0 if allowed then new_tokens = now_tokens - requested -- 允许访问 allowed_num = 1 end -- ttl > 0,将当前令牌的剩余数量和当前时间戳存入redis if ttl > 0 then redis.call('setex', tokens_key, ttl, new_tokens) redis.call('setex', timestamp_key, ttl, now) end -- 返回参数 return { allowed_num, new_tokens }
4.4.查看 gatewayredisautoconfiguration 脚本初始化
@bean @suppresswarnings("unchecked") public redisscript redisrequestratelimiterscript() { defaultredisscript redisscript = new defaultredisscript<>(); redisscript.setscriptsource( // 根据指定路径获取lua脚本来初始化配置 new resourcescriptsource(new classpathresource("meta-inf/scripts/request_rate_limiter.lua"))); redisscript.setresulttype(list.class); return redisscript; } @bean @conditionalonmissingbean public redisratelimiter redisratelimiter(reactivestringredistemplate redistemplate, @qualifier(redisratelimiter.redis_script_name) redisscript<list<long>> redisscript, configurationservice configurationservice) { return new redisratelimiter(redistemplate, redisscript, configurationservice); }
思考:请求限流过滤器是如何开启?
1.通过yaml配置开启
spring: cloud: gateway: server: webflux: filter: request-rate-limiter: enabled: true
2.gatewayautoconfiguration自动注入bean
@bean @conditionalonbean({ ratelimiter.class, keyresolver.class }) @conditionalonenabledfilter public requestratelimitergatewayfilterfactory requestratelimitergatewayfilterfactory(ratelimiter ratelimiter, keyresolver resolver) { return new requestratelimitergatewayfilterfactory(ratelimiter, resolver); }
重点来了,真正加载这个bean的是 @conditionalonenabledfilter
注解进行判断
@retention(retentionpolicy.runtime) @target({ elementtype.type, elementtype.method }) @documented @conditional(onenabledfilter.class) public @interface conditionalonenabledfilter { // 这里value是用来指定满足条件的某些类,换一句话说,就是这些类都加载或注入到ioc容器,这个注解修饰的自动装配类才会生效 class<? extends gatewayfilterfactory<?>> value() default onenabledfilter.defaultvalue.class; }
我们继续跟进代码,查看@conditional(onenabledfilter.class)
众所周知,@conditional
可以用来加载满足条件的bean,所以,我们分析一下onenabledfilter
public class onenabledfilter extends onenabledcomponent<gatewayfilterfactory<?>> {}
我分析它的父类,这里有你想要的答案!
public abstract class onenabledcomponent<t> extends springbootcondition implements configurationcondition { private static final string prefix = "spring.cloud.gateway.server.webflux."; private static final string suffix = ".enabled"; private conditionoutcome determineoutcome(class<? extends t> componentclass, propertyresolver resolver) { // 拼接完整名称 // 例如 => spring.cloud.gateway.server.webflux.request-rate-limiter.enabled string key = prefix + normalizecomponentname(componentclass) + suffix; conditionmessage.builder messagebuilder = forcondition(annotationclass().getname(), componentclass.getname()); if ("false".equalsignorecase(resolver.getproperty(key))) { // 不满足条件不加载bean return conditionoutcome.nomatch(messagebuilder.because("bean is not available")); } // 满足条件加载bean return conditionoutcome.match(); } }
5.优化限流响应[使用全限定类名直接覆盖类]
小伙伴们,有没有发现,这个这个响应体封装的不太好,因此,我们来自定义吧,我们直接覆盖类,代码修改如下
@getter @configurationproperties("spring.cloud.gateway.server.webflux.filter.request-rate-limiter") public class requestratelimitergatewayfilterfactory extends abstractgatewayfilterfactory<requestratelimitergatewayfilterfactory.config> { private static final string empty_key = "____empty_key__"; private final ratelimiter<?> defaultratelimiter; private final keyresolver defaultkeyresolver; /** * switch to deny requests if the key resolver returns an empty key, defaults to true. */ @setter private boolean denyemptykey = true; /** httpstatus to return when denyemptykey is true, defaults to forbidden. */ @setter private string emptykeystatuscode = httpstatus.forbidden.name(); public requestratelimitergatewayfilterfactory(ratelimiter<?> defaultratelimiter, keyresolver defaultkeyresolver) { super(config.class); this.defaultratelimiter = defaultratelimiter; this.defaultkeyresolver = defaultkeyresolver; } @override public gatewayfilter apply(config config) { keyresolver resolver = getordefault(config.keyresolver, defaultkeyresolver); ratelimiter<?> limiter = getordefault(config.ratelimiter, defaultratelimiter); boolean denyempty = getordefault(config.denyemptykey, this.denyemptykey); httpstatusholder emptykeystatus = httpstatusholder .parse(getordefault(config.emptykeystatus, this.emptykeystatuscode)); return (exchange, chain) -> resolver.resolve(exchange).defaultifempty(empty_key).flatmap(key -> { if (empty_key.equals(key)) { if (denyempty) { setresponsestatus(exchange, emptykeystatus); return exchange.getresponse().setcomplete(); } return chain.filter(exchange); } string routeid = config.getrouteid(); if (routeid == null) { route route = exchange.getattribute(serverwebexchangeutils.gateway_route_attr); assert.notnull(route, "route is null"); routeid = route.getid(); } return limiter.isallowed(routeid, key).flatmap(response -> { for (map.entry<string, string> header : response.getheaders().entryset()) { exchange.getresponse().getheaders().add(header.getkey(), header.getvalue()); } if (response.isallowed()) { return chain.filter(exchange); } // 主要修改这行 return responseok(exchange, result.fail("too_many_requests", "请求太频繁")); }); }); } private mono<void> responseok(serverwebexchange exchange, object data) { return responseok(exchange, jacksonutils.tojsonstr(data), mediatype.application_json); } private mono<void> responseok(serverwebexchange exchange, string str, mediatype contenttype) { databuffer buffer = exchange.getresponse().bufferfactory().wrap(str.getbytes(standardcharsets.utf_8)); serverhttpresponse response = exchange.getresponse(); response.setstatuscode(httpstatus.ok); response.getheaders().setcontenttype(contenttype); response.getheaders().setcontentlength(str.getbytes(standardcharsets.utf_8).length); return response.writewith(flux.just(buffer)); } private <t> t getordefault(t configvalue, t defaultvalue) { return (configvalue != null) ? configvalue : defaultvalue; } public static class config implements hasrouteid { @getter private keyresolver keyresolver; @getter private ratelimiter<?> ratelimiter; @getter private httpstatus statuscode = httpstatus.too_many_requests; @getter private boolean denyemptykey; @getter private string emptykeystatus; private string routeid; public config setkeyresolver(keyresolver keyresolver) { this.keyresolver = keyresolver; return this; } public config setratelimiter(ratelimiter<?> ratelimiter) { this.ratelimiter = ratelimiter; return this; } public config setstatuscode(httpstatus statuscode) { this.statuscode = statuscode; return this; } public config setdenyemptykey(boolean denyemptykey) { this.denyemptykey = denyemptykey; return this; } public config setemptykeystatus(string emptykeystatus) { this.emptykeystatus = emptykeystatus; return this; } @override public void setrouteid(string routeid) { this.routeid = routeid; } @override public string getrouteid() { return this.routeid; } } }
二、熔断降级
思考:为什么需要熔断降级?
当某个服务发生故障时(超时,响应慢,宕机),上游服务无法及时获取响应,进而也导致故障,出现服务雪崩【服务雪崩是指故障像滚雪球一样沿着调用链向上游扩展,进而导致整个系统瘫痪】
熔断降级的目标就是在故障发生时,快速隔离问题服务【快速失败,防止资源耗尽】,保护系统资源不被耗尽,防止故障扩散,保护核心业务可用性。
1.技术选型
1.1.熔断降级框架选型对比表
对比维度 | hystrix (netflix) | sentinel (alibaba) | resilience4j |
---|---|---|---|
当前状态 | ❌ 停止更新 (维护模式) | ✅ 持续更新 | ✅ 持续更新 |
熔断机制 | 滑动窗口计数 | 响应时间/异常比例/qps | 错误率/响应时间阈值 |
流量控制 | ❌ 仅基础隔离 | ✅ qps/并发数/热点参数/集群流控 | ✅ ratelimiter |
隔离策略 | 线程池(开销大)/信号量 | 并发线程数(无线程池开销) | 信号量/bulkhead |
降级能力 | fallback 方法 | fallback + 系统规则自适应 | fallback + 自定义组合策略 |
实时监控 | ✅ hystrix dashboard | ✅ 原生控制台(可视化动态规则) | ❌ 需整合 prometheus/grafana |
动态配置 | ❌ 依赖 archaius | ✅ 控制台实时推送 | ✅ 需编码实现(如spring cloud config) |
生态集成 | ✅ spring cloud netflix | ✅ spring cloud alibaba/多语言网关 | ✅ spring boot/响应式编程 |
性能开销 | 高(线程池隔离) | 低(无额外线程) | 极低(纯函数式) |
适用场景 | 遗留系统维护 | 高并发控制/秒杀/热点防护 | 云原生/轻量级微服务 |
推荐指数 | ⭐⭐ (不推荐新项目) | ⭐⭐⭐⭐⭐ (java高并发首选) | ⭐⭐⭐⭐⭐ (云原生/响应式首选) |
1.2选型决策指南
需求场景 | 推荐方案 | 原因 |
---|---|---|
电商秒杀/api高频调用管控 | ✅ sentinel | 精细流量控制+热点防护+实时看板 |
kubernetes云原生微服务 | ✅ resilience4j | 轻量化+无缝集成prometheus+响应式支持 |
spring cloud netflix旧系统 | ⚠️ hystrix | 兼容现存代码(短期过渡) |
多语言混合架构(如go+java) | ✅ sentinel | 通过sidecar代理支持非java服务 |
响应式编程(webflux) | ✅ resilience4j | 原生reactive api支持 |
2.resilience4j使用
resilience4j
可以看作是 hystrix
的替代品,resilience4j支持 熔断器
和单机限流
resilience4j 是一个专为函数式编程设计的轻量级容错库。resilience4j 提供高阶函数(装饰器),可通过断路器、速率限制器、重试或隔离功能增强任何函数式接口、lambda 表达式或方法引用。您可以在任何函数式接口、lambda 表达式或方法引用上堆叠多个装饰器。这样做的好处是,您可以只选择所需的装饰器,而无需考虑其他因素。
2.1.网关熔断降级(spring cloud gateway + resilience4j实战)
2.1.1.pom依赖
<dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactid> </dependency>
2.1.2.yaml配置
spring: application: name: laokou-gateway cloud: gateway: server: webflux: routes: - id: laokou-sso-demo uri: lb://laokou-sso-demo predicates: - path=/sso/** filters: - name: circuitbreaker args: name: default fallbackuri: "forward:/fallback" filter: circuit-breaker: enabled: true
2.1.3.circuitbreakerconfig配置
/** * @author laokou */ @configuration public class circuitbreakerconfig { @bean public routerfunction<serverresponse> routerfunction() { return routerfunctions.route( requestpredicates.path("/fallback").and(requestpredicates.accept(mediatype.text_plain)), (request) -> serverresponse.status(httpstatus.sc_ok) .contenttype(mediatype.application_json) .body(bodyinserters.fromvalue(result.fail("service_unavailable", "服务正在维护")))); } @bean public customizer<reactiveresilience4jcircuitbreakerfactory> reactiveresilience4jcircuitbreakerfactorycustomizer() { return factory -> factory.configuredefault(id -> new resilience4jconfigbuilder(id) // 3秒后超时时间 .timelimiterconfig(timelimiterconfig.custom().timeoutduration(duration.ofseconds(3)).build()) .circuitbreakerconfig(io.github.resilience4j.circuitbreaker.circuitbreakerconfig.ofdefaults()) .build()); } }
到此这篇关于spring cloud gateway实现分布式限流和熔断降级的文章就介绍到这了,更多相关spring cloud gateway分布式限流和熔断降级内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论