一、核心特性
springboot 集成
支持 @async 注解,简化异步方法调用。
参数可配置化
核心线程数、最大线程数、队列容量、拒绝策略等均可通过配置调整。
生命周期管理
实现 lifecycle 接口,支持线程池的启动和关闭(如应用关闭时优雅终止任务)。
任务装饰器
支持通过 taskdecorator 对任务进行装饰(如传递上下文信息)
二、添加依赖
在 pom.xml
文件中添加 spring boot starter aop 依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-aop</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency>
三、参数详解
通过 spring 配置文件或 @bean 定义线程池时,需设置以下关键参数:
参数名称 | 说明 | 默认值 |
---|---|---|
corepoolsize | 核心线程数,即使空闲也不会被回收 | 1 |
maxpoolsize | 最大线程数,当队列满时创建新线程直到达到此值 | integer.max_value |
queuecapacity | 任务队列容量(使用 linkedblockingqueue 或 arrayblockingqueue) | integer.max_value |
keepaliveseconds | 非核心线程的空闲存活时间(秒) | 60 |
threadnameprefix | 线程名前缀,便于日志追踪 | "task-executor-" |
allowcorethreadtimeout | 是否允许核心线程超时回收 | false |
rejectedexecutionhandler | 拒绝策略(如 abortpolicy、callerrunspolicy) | abortpolicy(直接抛出异常) |
四、配置线程池
@configuration @enableasync public class executorconfig { private static final logger logger = loggerfactory.getlogger(executorconfig.class); @value("${async.executor.thread.core_pool_size}") private int corepoolsize; @value("${async.executor.thread.max_pool_size}") private int maxpoolsize; @value("${async.executor.thread.queue_capacity}") private int queuecapacity; @value("${async.executor.thread.name.prefix}") private string nameprefix; @bean(name = "asyncserviceexecutor") public executor asyncserviceexecutor() { logger.info("start asyncserviceexecutor"); threadpooltaskexecutor executor = new threadpooltaskexecutor(); //配置核心线程数 executor.setcorepoolsize(corepoolsize); //配置最大线程数 executor.setmaxpoolsize(maxpoolsize); //配置队列大小 executor.setqueuecapacity(queuecapacity); //配置线程池中的线程的名称前缀 executor.setthreadnameprefix(nameprefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //执行初始化 executor.initialize(); return executor; } }
@value
是我配置在 application.yml
,可以参考配置,自由定义
# 异步线程配置 # 配置核心线程数 async.executor.thread.core_pool_size = 5 # 配置最大线程数 async.executor.thread.max_pool_size = 5 # 配置队列大小 async.executor.thread.queue_capacity = 99999 # 配置线程池中的线程的名称前缀 async.executor.thread.name.prefix = async-service-
五、应用实践
1、异步任务处理
创建一个服务类 asyncservice
,并在其方法上使用 @async
注解来定义异步任务:
import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.scheduling.annotation.async; import org.springframework.stereotype.service; @service public class asyncservice { private static final logger logger = loggerfactory.getlogger(asyncservice.class); @async("taskexecutor") public void asynctask(string taskname) { logger.info(thread.currentthread().getname() + " 开始执行任务: " + taskname); try { thread.sleep(2000); // 模拟耗时操作 } catch (interruptedexception e) { thread.currentthread().interrupt(); logger.error("任务执行被中断", e); } finally { logger.info(thread.currentthread().getname() + " 任务执行完成: " + taskname); } } }
创建一个控制器类 asynccontroller
,用于触发异步任务(线程安全的)
import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.scheduling.annotation.asyncresult; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.restcontroller; import java.util.concurrent.future; @restcontroller public class asynccontroller { private static final logger logger = loggerfactory.getlogger(asynccontroller.class); @autowired private asyncservice asyncservice; @getmapping("/trigger") public string triggerasynctasks() { logger.info("开始触发异步任务"); for (int i = 0; i < 10; i++) { asyncservice.asynctask("任务 " + i); } return "异步任务已触发"; } }
创建一个监控组件 threadpoolmonitor
,用于定期监控线程池的状态
import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.scheduling.annotation.scheduled; import org.springframework.stereotype.component; @component public class threadpoolmonitor { private static final logger logger = loggerfactory.getlogger(threadpoolmonitor.class); @autowired private threadpooltaskexecutor taskexecutor; @scheduled(fixedrate = 60000) // 每分钟执行一次 public void monitorthreadpool() { int activecount = taskexecutor.getactivecount(); int poolsize = taskexecutor.getpoolsize(); int corepoolsize = taskexecutor.getcorepoolsize(); int maxpoolsize = taskexecutor.getmaxpoolsize(); int queuesize = taskexecutor.getthreadpoolexecutor().getqueue().size(); int completedtaskcount = taskexecutor.getthreadpoolexecutor().getcompletedtaskcount(); logger.info("线程池状态 - 活动线程数: {}, 当前线程数: {}, 核心线程数: {}, 最大线程数: {}, 队列大小: {}, 已完成任务数: {}", activecount, poolsize, corepoolsize, maxpoolsize, queuesize, completedtaskcount); // 检查线程池是否接近饱和 if (activecount >= maxpoolsize * 0.8 || queuesize >= taskexecutor.getqueuecapacity() * 0.8) { logger.warn("线程池负载过高!请考虑优化配置或检查任务执行情况"); } } }
确保在启动类上添加 @enableasync
注解,以启用异步任务支持
import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.scheduling.annotation.enableasync; @springbootapplication @enableasync public class asyncdemoapplication { public static void main(string[] args) { springapplication.run(asyncdemoapplication.class, args); } }
测试:
启动 spring boot 应用后,访问 http://localhost:8080/trigger
,即可看到异步任务在线程池中执行的情况,同时线程池的状态也会定期输出到日志中。
代码说明
@enableasync 注解 :用于启用 spring 的异步方法执行支持,确保 spring 容器能够识别和处理带有
@async
注解的方法。@async 注解 :用于标注希望异步执行的方法,需指定所使用的线程池 bean 的名称,在本例中为 “taskexecutor”。当该方法被调用时,spring 会将其提交到指定的线程池中执行。
threadpooltaskexecutor :是 spring 提供的一个线程池任务执行器,通过设置核心线程数、最大线程数、队列容量等参数,可以根据应用的需求灵活地配置线程池。
异步任务失败处理 :通过自定义的拒绝策略,在线程池满时记录详细信息并抛出异常,以便及时发现任务执行失败的情况。
线程池监控 :使用
@scheduled
注解定期监控线程池的状态,包括活动线程数、当前线程数、核心线程数、最大线程数、队列大小和已完成任务数等,帮助开发者了解线程池的运行情况,以便及时进行优化和调整
2、高并发请求处理
在 web 应用中处理大量并发请求,避免阻塞主线程
@restcontroller public class mycontroller { @autowired private threadpooltaskexecutor taskexecutor; @getmapping("/process") public completablefuture<string> handlerequest() { return completablefuture.supplyasync(() -> { // 耗时操作 return "result"; }, taskexecutor); } }
3、定时任务调度
@enablescheduling @configuration public class schedulerconfig { @bean public threadpooltaskscheduler taskscheduler() { threadpooltaskscheduler scheduler = new threadpooltaskscheduler(); scheduler.setpoolsize(5); scheduler.setthreadnameprefix("scheduler-"); return scheduler; } } @service public class scheduledservice { @scheduled(fixedrate = 5000) public void scheduledtask() { // 定时任务逻辑 } }
拒绝策略(rejected policies)
当线程池和队列均满时,处理新任务的策略:
策略类 | 行为描述 |
---|---|
abortpolicy | 直接抛出 rejectedexecutionexception(默认) |
callerrunspolicy | 由提交任务的线程直接执行任务(同步阻塞提交者) |
discardpolicy | 静默丢弃新任务,不抛异常 |
discardoldestpolicy | 丢弃队列中最旧的任务,然后重试提交新任务 |
如下给出不同拒绝策略的配置类,请结合上面的配置类整合使用
import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.scheduling.concurrent.threadpooltaskexecutor; import java.util.concurrent.blockingqueue; import java.util.concurrent.linkedblockingqueue; import java.util.concurrent.threadpoolexecutor; import java.util.concurrent.timeunit; @configuration public class threadpoolconfig { @bean(name = "abortpolicyexecutor") public threadpooltaskexecutor abortpolicyexecutor() { return createexecutor(new threadpoolexecutor.abortpolicy()); } @bean(name = "callerrunspolicyexecutor") public threadpooltaskexecutor callerrunspolicyexecutor() { return createexecutor(new threadpoolexecutor.callerrunspolicy()); } @bean(name = "discardpolicyexecutor") public threadpooltaskexecutor discardpolicyexecutor() { return createexecutor(new threadpoolexecutor.discardpolicy()); } @bean(name = "discardoldestpolicyexecutor") public threadpooltaskexecutor discardoldestpolicyexecutor() { return createexecutor(new threadpoolexecutor.discardoldestpolicy()); } private threadpooltaskexecutor createexecutor(threadpoolexecutor.rejectedexecutionhandler rejectedexecutionhandler) { threadpooltaskexecutor executor = new threadpooltaskexecutor(); executor.setcorepoolsize(5); // 核心线程数 executor.setmaxpoolsize(10); // 最大线程数 executor.setqueuecapacity(100); // 队列容量 executor.setthreadnameprefix("task-executor-"); // 线程名前缀 executor.setrejectedexecutionhandler(rejectedexecutionhandler); executor.initialize(); return executor; } }
创建一个服务类 taskservice
,用于执行任务
import org.springframework.scheduling.annotation.async; import org.springframework.stereotype.service; @service public class taskservice { @async("abortpolicyexecutor") public void executewithabortpolicy(string taskname) { executetask(taskname); } @async("callerrunspolicyexecutor") public void executewithcallerrunspolicy(string taskname) { executetask(taskname); } @async("discardpolicyexecutor") public void executewithdiscardpolicy(string taskname) { executetask(taskname); } @async("discardoldestpolicyexecutor") public void executewithdiscardoldestpolicy(string taskname) { executetask(taskname); } private void executetask(string taskname) { try { system.out.println(thread.currentthread().getname() + " 开始执行任务: " + taskname); thread.sleep(2000); // 模拟任务执行时间 system.out.println(thread.currentthread().getname() + " 任务执行完成: " + taskname); } catch (interruptedexception e) { thread.currentthread().interrupt(); system.err.println("任务执行被中断: " + taskname); } } }
创建一个控制器类 taskcontroller
,用于触发任务执行
import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.requestparam; import org.springframework.web.bind.annotation.restcontroller; @restcontroller public class taskcontroller { @autowired private taskservice taskservice; @getmapping("/trigger/abort") public string triggerabortpolicy(@requestparam string taskname) { taskservice.executewithabortpolicy(taskname); return "任务已提交到使用 abortpolicy 的线程池"; } @getmapping("/trigger/caller") public string triggercallerrunspolicy(@requestparam string taskname) { taskservice.executewithcallerrunspolicy(taskname); return "任务已提交到使用 callerrunspolicy 的线程池"; } @getmapping("/trigger/discard") public string triggerdiscardpolicy(@requestparam string taskname) { taskservice.executewithdiscardpolicy(taskname); return "任务已提交到使用 discardpolicy 的线程池"; } @getmapping("/trigger/discardoldest") public string triggerdiscardoldestpolicy(@requestparam string taskname) { taskservice.executewithdiscardoldestpolicy(taskname); return "任务已提交到使用 discardoldestpolicy 的线程池"; } }
启动 spring boot 应用后,分别访问以下 url 来测试不同拒绝策略的行为:
http://localhost:8080/trigger/abort?taskname=任务1
http://localhost:8080/trigger/caller?taskname=任务2
http://localhost:8080/trigger/discard?taskname=任务3
http://localhost:8080/trigger/discardoldest?taskname=任务4
代码说明
线程池配置:
使用
threadpooltaskexecutor
创建线程池。配置了 4 个不同的线程池,每个线程池使用不同的拒绝策略。
每个线程池的核心线程数为 5,最大线程数为 10,队列容量为 100。
拒绝策略:
abortpolicy:直接抛出
rejectedexecutionexception
。callerrunspolicy:由提交任务的线程直接执行任务。
discardpolicy:静默丢弃新任务,不抛异常。
discardoldestpolicy:丢弃队列中最旧的任务,然后重试提交新任务。
任务执行:
taskservice
类中的每个方法都使用@async
注解,并指定使用的线程池。executetask
方法模拟任务执行,包含一个 2 秒的睡眠时间。通过这个示例,你可以观察不同拒绝策略在任务被拒绝时的行为。例如,当线程池满时,
abortpolicy
会抛出异常,callerrunspolicy
会让提交任务的线程执行任务,discardpolicy
会静默丢弃任务,而discardoldestpolicy
会丢弃最旧的任务并尝试提交新任务
6、最佳配置
· 合理设置线程池参数
cpu 密集型任务:核心线程数 ≈ cpu 核心数
i/o 密集型任务:核心线程数 ≈ cpu 核心数 * 2,并增大队列容量。
· 避免队列无限堆积
设置合理的 queuecapacity,防止内存溢出(oom)。
· 统一异常处理
通过 asyncuncaughtexceptionhandler 捕获异步任务中的异常:@configuration public class asyncconfig implements asyncconfigurer { @override public executor getasyncexecutor() { threadpooltaskexecutor executor = new threadpooltaskexecutor(); // ... 配置参数 return executor; } @override public asyncuncaughtexceptionhandler getasyncuncaughtexceptionhandler() { return (ex, method, params) -> { // 处理异常 }; } }
应用退出时,调用 shutdown() 并等待剩余任务执行完毕
executor.shutdown(); try { if (!executor.awaittermination(60, timeunit.seconds)) { executor.shutdownnow(); } } catch (interruptedexception e) { executor.shutdownnow(); }
总结:
threadpooltaskexecutor 是 spring 生态中管理线程任务的利器,通过灵活的配置和与 spring 的无缝集成,能够高效处理异步任务、高并发请求和定时调度。合理设置参数、选择拒绝策略,并结合监控手段,可显著提升系统性能和稳定性。
到此这篇关于springboot线程池配置使用详解的文章就介绍到这了,更多相关springboot线程池配置使用内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论