调用第三方api最怕什么?怕被限流!今天分享一个自制的限流任务执行器,能帮你轻松控制请求频率,还能自动重试失败任务,指数退避不添乱。代码可直接复制到项目中使用~
背景痛点
很多场景下我们需要控制任务的执行速率:
- 调用淘宝ip接口,对方限制每秒最多1次请求
- 批量请求第三方api,担心触发限流封禁
- 任务可能因网络抖动失败,需要自动重试
今天要介绍的 ratelimitedexecutor 就是为解决这些问题而生的。
核心功能
限流执行 – 每秒最多执行n个任务,超出则排队等待
顺序保证 – 任务严格按照提交顺序执行
自动重试 – 失败后自动重试(可配置最大次数)
指数退避 – 支持退避延迟策略,避免加重服务端压力
异步返回 – 使用 completablefuture 获取结果,不阻塞主线程
类设计(完整代码)
import com.google.common.util.concurrent.ratelimiter;
import java.util.concurrent.*;
import java.util.function.function;
/**
* 限流任务执行器:按固定速率顺序执行任务,支持重试。
* @param <t> 任务返回值类型
*/
public class ratelimitedexecutor<t> {
private final blockingqueue<taskwrapper<t>> queue = new linkedblockingqueue<>();
private final ratelimiter ratelimiter;
private final int maxretries;
private final long initialdelayms; // 首次重试延迟(毫秒)
private final double backoffmultiplier; // 退避乘数(如2.0表示每次翻倍)
private final executorservice worker = executors.newsinglethreadexecutor(r -> {
thread t = new thread(r, "ratelimitedexecutor-worker");
t.setdaemon(true); // 设为守护线程,避免阻止jvm退出
return t;
});
private volatile boolean running = true;
/**
* 构造限流执行器
* @param permitspersecond 每秒允许执行的任务数(如1.0表示每秒1次)
* @param maxretries 最大重试次数(不含首次执行)
* @param initialdelayms 首次重试延迟毫秒数
* @param backoffmultiplier 退避乘数(1.0表示固定延迟,>1.0表示指数退避)
*/
public ratelimitedexecutor(double permitspersecond, int maxretries,
long initialdelayms, double backoffmultiplier) {
this.ratelimiter = ratelimiter.create(permitspersecond);
this.maxretries = maxretries;
this.initialdelayms = initialdelayms;
this.backoffmultiplier = backoffmultiplier;
worker.submit(this::process);
}
/**
* 提交一个任务,返回completablefuture异步获取结果
* @param task 需要执行的任务(callable)
* @return 代表异步结果的completablefuture
*/
public completablefuture<t> submit(callable<t> task) {
completablefuture<t> future = new completablefuture<>();
queue.offer(new taskwrapper<>(task, future));
return future;
}
// 工作线程主循环
private void process() {
while (running) {
try {
taskwrapper<t> wrapper = queue.take(); // 阻塞直到有任务
executewithretry(wrapper);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
break;
}
}
}
// 执行单个任务(带重试)
private void executewithretry(taskwrapper<t> wrapper) {
int retries = 0;
long delay = initialdelayms;
while (retries <= maxretries) {
// 限流:获取令牌,若不足则阻塞
ratelimiter.acquire();
try {
t result = wrapper.task.call();
wrapper.future.complete(result);
return; // 成功,结束
} catch (exception e) {
retries++;
if (retries > maxretries) {
wrapper.future.completeexceptionally(e);
return;
}
// 重试等待(退避)
try {
thread.sleep(delay);
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
wrapper.future.completeexceptionally(ie);
return;
}
// 更新下次重试延迟
delay = (long) (delay * backoffmultiplier);
}
}
}
/**
* 优雅关闭执行器:等待已提交任务执行完毕,不再接受新任务
*/
public void shutdown() {
running = false;
worker.shutdown(); // 不再接受新任务
try {
if (!worker.awaittermination(5, timeunit.seconds)) {
worker.shutdownnow();
}
} catch (interruptedexception e) {
worker.shutdownnow();
thread.currentthread().interrupt();
}
}
/**
* 立即关闭执行器,尝试中断正在执行的任务
*/
public void shutdownnow() {
running = false;
worker.shutdownnow();
}
// 内部任务包装类
private static class taskwrapper<t> {
final callable<t> task;
final completablefuture<t> future;
taskwrapper(callable<t> task, completablefuture<t> future) {
this.task = task;
this.future = future;
}
}
}
依赖要求
项目需要引入 guava(提供 ratelimiter):
<dependency>
<groupid>com.google.guava</groupid>
<artifactid>guava</artifactid>
<version>32.1.2-jre</version>
</dependency>构造参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| permitspersecond | double | 每秒允许执行的任务数。例:1.0→每秒1次,0.5→每2秒1次 |
| maxretries | int | 最大重试次数(不含首次执行)。0表示不重试 |
| initialdelayms | long | 首次重试前的等待时间(毫秒) |
| backoffmultiplier | double | 退避乘数。2.0→每次延迟翻倍;1.0→固定延迟 |
核心方法
completablefuture<t> submit(callable<t> task):提交任务,返回 completablefuture,可异步获取结果或异常。
void shutdown():优雅关闭:等待已提交任务执行完毕,不再接受新任务。
void shutdownnow():立即关闭:尝试中断当前执行的任务。
注意事项
执行器内部使用 单线程 处理任务,严格保证提交顺序。
限流基于 ratelimiter,每次执行前阻塞直到获取令牌,因此即使任务执行时间极短,也能保证速率限制。
重试期间工作线程会阻塞等待,后续任务不会提前执行,顺序性得以保持。
工作线程默认设为守护线程,当所有用户线程结束时 jvm 会自动退出,无需手动关闭。但建议在应用关闭时调用 shutdown() 以确保任务完整执行。
实战示例:调用淘宝ip接口
假设淘宝ip接口地址为 http://ip.taobao.com/outgetipinfo?ip={ip},我们需要:
- 限制每秒 1 次请求
- 失败重试 3 次
- 首次重试延迟 1 秒,指数退避乘数 2.0
demo 代码
public class taobaoipdemo {
public static void main(string[] args) throws exception {
// 创建限流执行器:每秒1次,重试3次,首次延迟1秒,指数退避2.0
ratelimitedexecutor<string> executor = new ratelimitedexecutor<>(
1.0, // 每秒1次
3, // 重试3次
1000, // 首次延迟1秒
2.0 // 指数退避
);
// 需要查询的ip列表
string[] ips = {"8.8.8.8", "114.114.114.114", "223.5.5.5"};
// 提交所有任务
for (string ip : ips) {
completablefuture<string> future = executor.submit(() -> queryip(ip));
// 异步处理结果
future.thenaccept(result -> {
system.out.println("ip: " + ip + ", 结果: " + result);
}).exceptionally(ex -> {
system.err.println("ip: " + ip + ", 查询失败: " + ex.getmessage());
return null;
});
}
// 等待所有任务完成(实际应用中不需要,这里仅演示)
thread.sleep(10000);
// 优雅关闭
executor.shutdown();
}
private static string queryip(string ip) {
string url = "https://ip.taobao.com/outgetipinfo?accesskey=alibaba-inc&ip=" + ip;
return restclient.create()
.get()
.uri(url)
.retrieve()
.body(string.class);
}
}
适用场景
- 调用第三方api需要严格限制qps(如淘宝ip、微信接口、百度地图等)
- 需要按顺序执行任务(如写入文件、顺序处理消息)
- 任务可能临时失败,需要自动重试(网络抖动、服务端限流)
- 希望重试策略为指数退避,避免雪崩效应
总结
这个轻量级的限流任务执行器,代码简洁、功能完整,能帮你轻松解决速率控制 + 顺序执行 + 自动重试三大问题。配合 completablefuture 异步编程,性能与体验兼得。
如果你也在为api限流或任务重试头疼,不妨复制这份代码到项目中试试~
本文代码已脱敏,可放心复制到生产项目。guava 版本建议使用 30.0 以上。
以上就是基于java手写一个通用限流任务执行器的详细内容,更多关于java限流任务执行器的资料请关注代码网其它相关文章!
发表评论