当前位置: 代码网 > it编程>编程语言>Java > Java如何优雅关闭异步中的ExecutorService

Java如何优雅关闭异步中的ExecutorService

2025年02月14日 Java 我要评论
1.executorservice的核心价值在并发编程领域,java的executorservice(位于java.util.concurrent包)是线程池管理的关键接口。作为executor框架的

1.executorservice的核心价值

在并发编程领域,java的executorservice(位于java.util.concurrent包)是线程池管理的关键接口。作为executor框架的核心组件,它通过解耦任务提交与执行机制,为开发者提供了:

  • 线程生命周期管理自动化
  • 任务队列智能调度
  • 资源复用优化机制
  • 异步执行结果追踪能力

2.关闭机制的必要性

不正确的线程池关闭会导致:

  • 内存泄漏(滞留线程无法回收)
  • 应用无法正常终止(非守护线程保持活跃)
  • 任务状态不一致(突然中断导致数据问题)
  • 系统资源耗尽(无限制线程创建)

3.shutdown()方法详解

3.1 方法特性

void shutdown()

状态转换

将线程池状态设置为shutdown,触发以下行为:

  • 拒绝新任务提交(触发rejectedexecutionhandler)
  • 继续执行已存在的任务:
    • 正在执行的任务(running tasks)
    • 等待队列中的任务(queued tasks)

典型应用场景

executorservice executor = executors.newfixedthreadpool(4);
// 提交多个任务...
executor.shutdown();

try {
    if (!executor.awaittermination(60, timeunit.seconds)) {
        system.err.println("仍有任务未在时限内完成");
    }
} catch (interruptedexception e) {
    thread.currentthread().interrupt();
}

3.2 内部运作机制

  • 原子性状态更新:cas操作修改线程池控制状态
  • 中断空闲线程:仅中断等待任务的worker线程
  • 队列消费保证:完全处理blockingqueue中的剩余任务

4.shutdownnow()方法剖析

4.1 方法定义

list<runnable> shutdownnow()

状态转换

将线程池状态设置为stop,触发:

  • 立即拒绝新任务
  • 中断所有工作线程(无论是否在执行任务)
  • 清空任务队列,返回未执行任务列表

4.2 中断处理要点

executor.shutdownnow();
// 典型返回值处理
list<runnable> unprocessed = executor.shutdownnow();
if (!unprocessed.isempty()) {
    logger.warn("丢弃{}个未执行任务", unprocessed.size());
}

任务中断条件

只有当任务代码正确处理中断时才能被终止:

class interruptibletask implements runnable {
    public void run() {
        while (!thread.currentthread().isinterrupted()) {
            // 执行可中断的操作
            try {
                thread.sleep(1000);
            } catch (interruptedexception e) {
                thread.currentthread().interrupt(); // 重置中断状态
                break;
            }
        }
    }
}

5.对比分析

特性shutdown()shutdownnow()
新任务接受立即拒绝立即拒绝
运行中任务处理等待完成尝试中断
队列任务处理全部执行清除并返回
返回值void未执行任务列表
适用场景优雅关闭紧急终止
线程中断策略仅中断空闲线程强制中断所有线程

6.最佳实践代码示例

6.1 标准关闭模板

public class gracefulshutdownexample {
    // 定义超时时间和时间单位(30秒)
    private static final int timeout = 30;
    private static final timeunit unit = timeunit.seconds;

    // 执行任务的方法,接收一个任务列表并将其提交给线程池执行
    public void executetasks(list<runnable> tasks) {
        // 创建一个固定大小的线程池,大小为系统可用处理器核心数
        executorservice executor = executors.newfixedthreadpool(runtime.getruntime().availableprocessors());
        
        try {
            // 将任务列表中的每个任务提交到线程池
            tasks.foreach(executor::submit);
        } finally {
            // 在所有任务提交完后,禁用线程池接收新任务,开始优雅关闭线程池
            executor.shutdown(); // 禁止再提交新任务
            try {
                // 等待线程池中的任务在指定超时内完成,如果超时未完成,则强制关闭线程池
                if (!executor.awaittermination(timeout, unit)) {
                    // 如果未能在超时内完成,则调用 shutdownnow() 强制终止所有活动任务
                    list<runnable> unfinished = executor.shutdownnow();
                    // 处理未完成的任务,例如记录日志或重新提交
                    handleunfinishedtasks(unfinished);
                }
            } catch (interruptedexception e) {
                // 如果在等待终止时被中断,恢复中断状态并强制关闭线程池
                thread.currentthread().interrupt();
                executor.shutdownnow();
            }
        }
    }
    
    // 处理未完成任务的方法,这里我们打印未完成任务的数量
    private void handleunfinishedtasks(list<runnable> tasks) {
        // 如果有未完成的任务,打印任务数量并执行额外的处理
        if (!tasks.isempty()) {
            system.out.println("未完成任务数: " + tasks.size());
            // 可在此处记录日志、重新排队未完成的任务等
        }
    }

}

构造线程池: executors.newfixedthreadpool() 创建一个固定大小的线程池,大小为系统可用的处理器核心数,这样可以更高效地利用 cpu 资源。

提交任务: 使用 tasks.foreach(executor::submit) 提交每个任务到线程池中执行。

优雅关闭线程池:

  • executor.shutdown() 禁用线程池接收新任务,但仍会执行已经提交的任务。
  • awaittermination() 方法用于等待所有任务执行完成。如果超时后任务未完成,则调用 shutdownnow() 强制关闭线程池,停止所有正在运行的任务,并返回未完成的任务。

处理中断: 如果在等待终止过程中发生 interruptedexception,线程会恢复中断状态,并且强制关闭线程池。

处理未完成任务: handleunfinishedtasks() 方法会处理未完成的任务,比如记录日志或者重新排队未完成的任务。

6.2 带回调的增强实现

public class enhancedexecutormanager {
    // 定义线程池对象
    private final executorservice executor;
    // 定义超时时间及单位
    private final long timeout;
    private final timeunit unit;

    // 构造函数,初始化线程池并设置超时时间和单位
    public enhancedexecutormanager(int corepoolsize, long timeout, timeunit unit) {
        // 创建一个核心池大小为 corepoolsize,最大池大小为 corepoolsize * 2,最大空闲时间 60秒的线程池
        this.executor = new threadpoolexecutor(
            corepoolsize,                             // 核心线程池大小
            corepoolsize * 2,                         // 最大线程池大小
            60l, timeunit.seconds,                    // 空闲线程的存活时间
            new linkedblockingqueue<>(1000),          // 使用容量为 1000 的队列来缓存任务
            new customthreadfactory(),                // 自定义线程工厂
            new threadpoolexecutor.callerrunspolicy() // 当任务无法提交时,调用者线程执行该任务
        );
        this.timeout = timeout;                     // 设置超时时间
        this.unit = unit;                           // 设置超时时间单位
    }
    
    // 优雅关闭线程池的方法
    public void shutdown() {
        executor.shutdown(); // 首先尝试正常关闭线程池,不再接收新的任务
        
        try {
            // 如果线程池未能在指定的超时时间内终止,则强制关闭
            if (!executor.awaittermination(timeout, unit)) {
                system.out.println("强制终止线程池...");
                // 强制停止所有正在执行的任务并返回丢弃的任务列表
                list<runnable> droppedtasks = executor.shutdownnow();
                system.out.println("丢弃任务数: " + droppedtasks.size());
            }
        } catch (interruptedexception e) {
            // 如果在等待过程中线程池关闭操作被中断,立即强制关闭并恢复中断状态
            executor.shutdownnow();
            thread.currentthread().interrupt();
        }
    }
    
    // 自定义线程工厂类,用于创建线程
    private static class customthreadfactory implements threadfactory {
        private static final atomicinteger poolnumber = new atomicinteger(1); // 线程池编号,用于生成线程名
        private final threadgroup group; // 线程组
        private final atomicinteger threadnumber = new atomicinteger(1); // 线程编号
        private final string nameprefix; // 线程名称前缀
    
        customthreadfactory() {
            // 获取当前系统的安全管理器,如果没有,则使用当前线程的线程组
            securitymanager s = system.getsecuritymanager();
            group = (s != null) ? s.getthreadgroup() :
                                  thread.currentthread().getthreadgroup();
            // 设置线程池的名称前缀
            nameprefix = "pool-" +
                          poolnumber.getandincrement() + // 线程池编号递增
                         "-thread-";
        }
    
        // 创建新线程的方法
        public thread newthread(runnable r) {
            // 创建新的线程,线程组、名称及优先级均已设置
            thread t = new thread(group, r,
                                  nameprefix + threadnumber.getandincrement(),
                                  0); // 默认优先级和daemon设置
            // 如果线程是守护线程,则将其设置为非守护线程
            if (t.isdaemon())
                t.setdaemon(false);
            // 设置线程优先级为默认
            if (t.getpriority() != thread.norm_priority)
                t.setpriority(thread.norm_priority);
            return t; // 返回新创建的线程
        }
    }
}

线程池初始化:

  • enhancedexecutormanager 的构造方法使用 threadpoolexecutor 创建一个线程池,线程池大小通过 corepoolsize 参数传递。线程池的最大线程数是核心线程数的两倍。
  • linkedblockingqueue 用作任务队列,大小为 1000。若任务量超过队列容量,则使用 callerrunspolicy 策略,即由提交任务的线程执行该任务。
  • 使用自定义的 customthreadfactory 来创建线程。

优雅关闭线程池:

  • shutdown() 方法首先调用 executor.shutdown() 来拒绝接受新的任务,然后等待线程池在指定的超时时间内关闭。
  • 如果线程池在超时时间内未能正常关闭,则调用 shutdownnow() 强制关闭并丢弃未执行的任务,同时输出丢弃任务的数量。
  • 如果在等待关闭过程中发生 interruptedexception,会强制关闭线程池,并恢复中断状态。

自定义线程工厂:

  • customthreadfactory 通过实现 threadfactory 接口来定义创建线程的行为,主要包括线程组、线程名称、守护线程状态和线程优先级的配置。
  • 每个线程的名称遵循 pool-编号-thread-编号 的格式。线程池的编号是递增的,每个线程有自己的编号。

7.关键注意事项

  • 守护线程问题:默认创建的是非守护线程,需显式关闭
  • 中断策略一致性:任务必须实现正确的中断处理逻辑
  • 拒绝策略配合:合理配置rejectedexecutionhandler
  • 资源释放顺序:数据库连接等资源应先于线程池关闭
  • 监控机制:建议集成线程池监控(如jmx)

8.高级应用场景

分级关闭策略

public class tieredshutdownmanager { 
    // 定义三个优先级的线程池列表:高优先级、中优先级、低优先级
    private final list<executorservice> highpriority; 
    private final list<executorservice> normalpriority; 
    private final list<executorservice> lowpriority; 
  
    // 公共方法用于优雅关闭所有线程池
    public void gracefulshutdown() { 
        // 依次关闭高、中、低优先级的线程池
        shutdowntier(highpriority, 10, timeunit.seconds); 
        shutdowntier(normalpriority, 30, timeunit.seconds); 
        shutdowntier(lowpriority, 60, timeunit.seconds); 
    } 
  
    // 私有方法,用于优雅关闭指定优先级的线程池
    private void shutdowntier(list<executorservice> tier, long timeout, timeunit unit) { 
        // 对指定的线程池列表执行关闭操作
        tier.foreach(executorservice::shutdown); 
  
        // 对每个线程池执行等待终止的操作,指定超时时间
        tier.foreach(executor -> { 
            try { 
                // 如果线程池未在超时时间内终止,则调用 shutdownnow 强制关闭
                if (!executor.awaittermination(timeout, unit)) { 
                    executor.shutdownnow(); 
                } 
            } catch (interruptedexception e) { 
                // 如果在等待终止过程中线程被中断,恢复中断状态并强制关闭线程池
                thread.currentthread().interrupt(); 
                executor.shutdownnow(); 
            } 
        }); 
    } 
}

gracefulshutdown 方法按照优先级顺序依次关闭高、中、低优先级的线程池。

shutdowntier 方法首先尝试正常关闭每个线程池(调用 shutdown),然后通过 awaittermination 方法等待线程池在指定的时间内结束,如果未成功结束,则调用 shutdownnow 强制关闭。

在关闭过程中,如果发生中断,则会捕获 interruptedexception 异常,并且中断当前线程,同时强制关闭线程池。

9.性能优化建议

根据任务类型选择队列策略:

  • cpu密集型:有界队列(arrayblockingqueue)
  • io密集型:无界队列(linkedblockingqueue)

监控关键指标:

threadpoolexecutor executor = (threadpoolexecutor) service;
system.out.println("活跃线程数: " + executor.getactivecount());
system.out.println("完成任务数: " + executor.getcompletedtaskcount());
system.out.println("队列大小: " + executor.getqueue().size());

动态调整参数:

executor.setcorepoolsize(newsize);
executor.setmaximumpoolsize(newmaxsize);

10.总结建议

根据oracle官方文档建议,在大多数生产场景中推荐以下关闭流程:

  • 优先调用shutdown()
  • 设置合理的awaittermination超时
  • 必要时调用shutdownnow()
  • 始终处理返回的未完成任务
  • 记录完整的关闭日志

正确选择关闭策略需要综合考量:

  • 任务重要性等级
  • 系统资源限制
  • 业务连续性需求
  • 数据一致性要求

到此这篇关于java如何优雅关闭异步中的executorservice的文章就介绍到这了,更多相关java关闭executorservice内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com