当前位置: 代码网 > it编程>编程语言>Java > 基于Java手写一个通用限流任务执行器

基于Java手写一个通用限流任务执行器

2026年04月07日 Java 我要评论
调用第三方api最怕什么?怕被限流!今天分享一个自制的限流任务执行器,能帮你轻松控制请求频率,还能自动重试失败任务,指数退避不添乱。代码可直接复制到项目中使用~背景痛点很多场景下我们需要控制任务的执行

调用第三方api最怕什么?怕被限流!今天分享一个自制的限流任务执行器,能帮你轻松控制请求频率,还能自动重试失败任务,指数退避不添乱。代码可直接复制到项目中使用~

背景痛点

很多场景下我们需要控制任务的执行速率:

  • 调用淘宝ip接口,对方限制每秒最多1次请求
  • 批量请求第三方api,担心触发限流封禁
  • 任务可能因网络抖动失败,需要自动重试

今天要介绍的 ratelimitedexecutor 就是为解决这些问题而生的。

核心功能

限流执行 – 每秒最多执行n个任务,超出则排队等待

顺序保证 – 任务严格按照提交顺序执行

自动重试 – 失败后自动重试(可配置最大次数)

指数退避 – 支持退避延迟策略,避免加重服务端压力

异步返回 – 使用 completablefuture 获取结果,不阻塞主线程

类设计(完整代码)

import com.google.common.util.concurrent.ratelimiter;
import java.util.concurrent.*;
import java.util.function.function;

/**
 * 限流任务执行器:按固定速率顺序执行任务,支持重试。
 * @param <t> 任务返回值类型
 */
public class ratelimitedexecutor<t> {
    private final blockingqueue<taskwrapper<t>> queue = new linkedblockingqueue<>();
    private final ratelimiter ratelimiter;
    private final int maxretries;
    private final long initialdelayms;    // 首次重试延迟(毫秒)
    private final double backoffmultiplier; // 退避乘数(如2.0表示每次翻倍)
    private final executorservice worker = executors.newsinglethreadexecutor(r -> {
        thread t = new thread(r, "ratelimitedexecutor-worker");
        t.setdaemon(true);  // 设为守护线程,避免阻止jvm退出
        return t;
    });
    private volatile boolean running = true;

    /**
     * 构造限流执行器
     * @param permitspersecond 每秒允许执行的任务数(如1.0表示每秒1次)
     * @param maxretries       最大重试次数(不含首次执行)
     * @param initialdelayms   首次重试延迟毫秒数
     * @param backoffmultiplier 退避乘数(1.0表示固定延迟,>1.0表示指数退避)
     */
    public ratelimitedexecutor(double permitspersecond, int maxretries,
                               long initialdelayms, double backoffmultiplier) {
        this.ratelimiter = ratelimiter.create(permitspersecond);
        this.maxretries = maxretries;
        this.initialdelayms = initialdelayms;
        this.backoffmultiplier = backoffmultiplier;
        worker.submit(this::process);
    }

    /**
     * 提交一个任务,返回completablefuture异步获取结果
     * @param task 需要执行的任务(callable)
     * @return 代表异步结果的completablefuture
     */
    public completablefuture<t> submit(callable<t> task) {
        completablefuture<t> future = new completablefuture<>();
        queue.offer(new taskwrapper<>(task, future));
        return future;
    }

    // 工作线程主循环
    private void process() {
        while (running) {
            try {
                taskwrapper<t> wrapper = queue.take(); // 阻塞直到有任务
                executewithretry(wrapper);
            } catch (interruptedexception e) {
                thread.currentthread().interrupt();
                break;
            }
        }
    }

    // 执行单个任务(带重试)
    private void executewithretry(taskwrapper<t> wrapper) {
        int retries = 0;
        long delay = initialdelayms;
        while (retries <= maxretries) {
            // 限流:获取令牌,若不足则阻塞
            ratelimiter.acquire();

            try {
                t result = wrapper.task.call();
                wrapper.future.complete(result);
                return; // 成功,结束
            } catch (exception e) {
                retries++;
                if (retries > maxretries) {
                    wrapper.future.completeexceptionally(e);
                    return;
                }
                // 重试等待(退避)
                try {
                    thread.sleep(delay);
                } catch (interruptedexception ie) {
                    thread.currentthread().interrupt();
                    wrapper.future.completeexceptionally(ie);
                    return;
                }
                // 更新下次重试延迟
                delay = (long) (delay * backoffmultiplier);
            }
        }
    }

    /**
     * 优雅关闭执行器:等待已提交任务执行完毕,不再接受新任务
     */
    public void shutdown() {
        running = false;
        worker.shutdown(); // 不再接受新任务
        try {
            if (!worker.awaittermination(5, timeunit.seconds)) {
                worker.shutdownnow();
            }
        } catch (interruptedexception e) {
            worker.shutdownnow();
            thread.currentthread().interrupt();
        }
    }

    /**
     * 立即关闭执行器,尝试中断正在执行的任务
     */
    public void shutdownnow() {
        running = false;
        worker.shutdownnow();
    }

    // 内部任务包装类
    private static class taskwrapper<t> {
        final callable<t> task;
        final completablefuture<t> future;

        taskwrapper(callable<t> task, completablefuture<t> future) {
            this.task = task;
            this.future = future;
        }
    }
}

依赖要求

项目需要引入 guava(提供 ratelimiter):

<dependency>
    <groupid>com.google.guava</groupid>
    <artifactid>guava</artifactid>
    <version>32.1.2-jre</version>
</dependency>

构造参数说明

参数类型说明
permitsperseconddouble每秒允许执行的任务数。例:1.0→每秒1次,0.5→每2秒1次
maxretriesint最大重试次数(不含首次执行)。0表示不重试
initialdelaymslong首次重试前的等待时间(毫秒)
backoffmultiplierdouble退避乘数。2.0→每次延迟翻倍;1.0→固定延迟

核心方法

completablefuture<t> submit(callable<t> task):提交任务,返回 completablefuture,可异步获取结果或异常。

void shutdown():优雅关闭:等待已提交任务执行完毕,不再接受新任务。

void shutdownnow():立即关闭:尝试中断当前执行的任务。

注意事项

执行器内部使用 单线程 处理任务,严格保证提交顺序

限流基于 ratelimiter,每次执行前阻塞直到获取令牌,因此即使任务执行时间极短,也能保证速率限制。

重试期间工作线程会阻塞等待,后续任务不会提前执行,顺序性得以保持。

工作线程默认设为守护线程,当所有用户线程结束时 jvm 会自动退出,无需手动关闭。但建议在应用关闭时调用 shutdown() 以确保任务完整执行。

实战示例:调用淘宝ip接口

假设淘宝ip接口地址为 http://ip.taobao.com/outgetipinfo?ip={ip},我们需要:

  • 限制每秒 1 次请求
  • 失败重试 3 次
  • 首次重试延迟 1 秒,指数退避乘数 2.0

demo 代码

public class taobaoipdemo {
    public static void main(string[] args) throws exception {
        // 创建限流执行器:每秒1次,重试3次,首次延迟1秒,指数退避2.0
        ratelimitedexecutor<string> executor = new ratelimitedexecutor<>(
            1.0,    // 每秒1次
            3,      // 重试3次
            1000,   // 首次延迟1秒
            2.0     // 指数退避
        );

        // 需要查询的ip列表
        string[] ips = {"8.8.8.8", "114.114.114.114", "223.5.5.5"};

        // 提交所有任务
        for (string ip : ips) {
            completablefuture<string> future = executor.submit(() -> queryip(ip));

            // 异步处理结果
            future.thenaccept(result -> {
                system.out.println("ip: " + ip + ", 结果: " + result);
            }).exceptionally(ex -> {
                system.err.println("ip: " + ip + ", 查询失败: " + ex.getmessage());
                return null;
            });
        }

        // 等待所有任务完成(实际应用中不需要,这里仅演示)
        thread.sleep(10000);

        // 优雅关闭
        executor.shutdown();
    }

    private static string queryip(string ip) {
        string url = "https://ip.taobao.com/outgetipinfo?accesskey=alibaba-inc&ip=" + ip;
        return restclient.create()
                .get()
                .uri(url)
                .retrieve()
                .body(string.class);
    }
}

适用场景

  • 调用第三方api需要严格限制qps(如淘宝ip、微信接口、百度地图等)
  • 需要按顺序执行任务(如写入文件、顺序处理消息)
  • 任务可能临时失败,需要自动重试(网络抖动、服务端限流)
  • 希望重试策略为指数退避,避免雪崩效应

总结

这个轻量级的限流任务执行器,代码简洁、功能完整,能帮你轻松解决速率控制 + 顺序执行 + 自动重试三大问题。配合 completablefuture 异步编程,性能与体验兼得。

如果你也在为api限流或任务重试头疼,不妨复制这份代码到项目中试试~

本文代码已脱敏,可放心复制到生产项目。guava 版本建议使用 30.0 以上。

以上就是基于java手写一个通用限流任务执行器的详细内容,更多关于java限流任务执行器的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com