问题分析
1.默认监控的局限性
@component
public class threadpoolmonitor {
@autowired
private threadpooltaskexecutor taskexecutor; // 只能监控这一个线程池
public void monitor() {
// 只能监控 taskexecutor 这个特定的bean
system.out.println("活跃线程: " + taskexecutor.getactivecount());
}
}
解决方案
方案1:手动注册所有线程池
@component
public class threadpoolmonitor {
private final map<string, threadpooltaskexecutor> executors = new concurrenthashmap<>();
// 手动注册线程池
public void registerexecutor(string name, threadpooltaskexecutor executor) {
executors.put(name, executor);
}
@scheduled(fixedrate = 30000)
public void monitorall() {
executors.foreach((name, executor) -> {
threadpoolexecutor pool = executor.getthreadpoolexecutor();
log.info("线程池[{}] - 活跃: {}/{}, 队列: {}/{}, 完成: {}",
name,
pool.getactivecount(),
pool.getpoolsize(),
pool.getqueue().size(),
pool.getqueue().remainingcapacity() + pool.getqueue().size(),
pool.getcompletedtaskcount());
});
}
}
// 在配置中注册
@configuration
public class executorconfig {
@autowired
private threadpoolmonitor monitor;
@bean("emailexecutor")
public threadpooltaskexecutor emailexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
// 配置...
executor.initialize();
// 注册到监控器
monitor.registerexecutor("emailexecutor", executor);
return executor;
}
@bean("smsexecutor")
public threadpooltaskexecutor smsexecutor() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
// 配置...
executor.initialize();
monitor.registerexecutor("smsexecutor", executor);
return executor;
}
}
方案2:自动发现所有线程池(推荐)
@component
public class globalthreadpoolmonitor {
@autowired
private applicationcontext applicationcontext;
@scheduled(fixedrate = 30000)
public void monitorallthreadpools() {
// 获取所有 threadpooltaskexecutor 类型的bean
map<string, threadpooltaskexecutor> executors =
applicationcontext.getbeansoftype(threadpooltaskexecutor.class);
// 获取所有 threadpoolexecutor 类型的bean(直接创建的)
map<string, threadpoolexecutor> nativeexecutors =
applicationcontext.getbeansoftype(threadpoolexecutor.class);
log.info("=== 线程池监控报告 ===");
// 监控 spring 封装的线程池
executors.foreach((beanname, executor) -> {
if (executor.getthreadpoolexecutor() != null) {
printpoolstats(beanname, executor.getthreadpoolexecutor());
}
});
// 监控原生线程池
nativeexecutors.foreach((beanname, executor) -> {
printpoolstats(beanname, executor);
});
}
private void printpoolstats(string name, threadpoolexecutor executor) {
log.info("线程池[{}]: 活跃{}/核心{}, 队列{}/{}, 完成任务: {}, 拒绝: {}",
name,
executor.getactivecount(),
executor.getpoolsize(),
executor.getqueue().size(),
executor.getqueue().size() + executor.getqueue().remainingcapacity(),
executor.getcompletedtaskcount(),
executor.getrejectedexecutionhandler().getclass().getsimplename());
}
}
方案3:监控 @async 使用的线程池
@component
public class asyncthreadpoolmonitor {
@autowired
private applicationcontext applicationcontext;
@scheduled(fixedrate = 30000)
public void monitorasyncpools() {
try {
// 通过反射获取spring内部的线程池
map<string, executor> asyncexecutors =
applicationcontext.getbeansoftype(executor.class);
asyncexecutors.foreach((name, executor) -> {
if (executor instanceof threadpooltaskexecutor) {
threadpoolexecutor pool = ((threadpooltaskexecutor) executor).getthreadpoolexecutor();
printasyncpoolstats(name, pool);
} else if (executor instanceof threadpoolexecutor) {
printasyncpoolstats(name, (threadpoolexecutor) executor);
} else if (executor instanceof taskexecutor) {
log.info("executor [{}]: 类型 {}", name, executor.getclass().getsimplename());
}
});
} catch (exception e) {
log.warn("监控异步线程池失败: {}", e.getmessage());
}
}
private void printasyncpoolstats(string name, threadpoolexecutor pool) {
double usagerate = pool.getmaximumpoolsize() > 0 ?
(double) pool.getactivecount() / pool.getmaximumpoolsize() * 100 : 0;
log.warn("异步线程池[{}]: 活跃{}/最大{}, 使用率: {:.1f}%, 队列: {}/{}",
name,
pool.getactivecount(),
pool.getmaximumpoolsize(),
usagerate,
pool.getqueue().size(),
pool.getqueue().size() + pool.getqueue().remainingcapacity());
}
}
方案4:集成micrometer监控(生产环境推荐)
@component
public class micrometerthreadpoolmonitor {
private final meterregistry meterregistry;
private final list<threadpoolexecutor> monitoredpools = new arraylist<>();
public micrometerthreadpoolmonitor(meterregistry meterregistry) {
this.meterregistry = meterregistry;
}
// 注册要监控的线程池
public void registerpool(string name, threadpoolexecutor pool) {
monitoredpools.add(pool);
// 注册指标
gauge.builder("thread.pool.active.count", pool, threadpoolexecutor::getactivecount)
.tag("pool", name)
.description("活跃线程数")
.register(meterregistry);
gauge.builder("thread.pool.queue.size", pool, p -> p.getqueue().size())
.tag("pool", name)
.description("队列大小")
.register(meterregistry);
gauge.builder("thread.pool.completed.tasks", pool, threadpoolexecutor::getcompletedtaskcount)
.tag("pool", name)
.description("完成任务数")
.register(meterregistry);
}
@eventlistener
public void onapplicationready(applicationreadyevent event) {
// 应用启动后自动发现并注册所有线程池
applicationcontext context = event.getapplicationcontext();
map<string, threadpooltaskexecutor> springexecutors =
context.getbeansoftype(threadpooltaskexecutor.class);
map<string, threadpoolexecutor> nativeexecutors =
context.getbeansoftype(threadpoolexecutor.class);
springexecutors.foreach((name, executor) -> {
if (executor.getthreadpoolexecutor() != null) {
registerpool("spring-" + name, executor.getthreadpoolexecutor());
}
});
nativeexecutors.foreach((name, executor) -> {
registerpool("native-" + name, executor);
});
log.info("已注册监控的线程池数量: {}", monitoredpools.size());
}
}
完整的生产级监控方案
@configuration
public class threadpoolmonitorconfig {
@bean
@conditionalonmissingbean
public globalthreadpoolmonitor globalthreadpoolmonitor() {
return new globalthreadpoolmonitor();
}
}
@component
@slf4j
public class globalthreadpoolmonitor {
@autowired
private applicationcontext applicationcontext;
private final map<string, threadpoolexecutor> allpools = new concurrenthashmap<>();
@postconstruct
public void init() {
discoverallthreadpools();
}
@scheduled(fixedrate = 30000)
public void monitorallpools() {
if (allpools.isempty()) {
discoverallthreadpools();
}
log.info("======= 线程池监控报告 =======");
allpools.foreach(this::logpoolstatus);
log.info("======= 监控报告结束 =======");
}
private void discoverallthreadpools() {
// 发现spring封装的线程池
applicationcontext.getbeansoftype(threadpooltaskexecutor.class)
.foreach((name, executor) -> {
if (executor.getthreadpoolexecutor() != null) {
allpools.put("spring-" + name, executor.getthreadpoolexecutor());
}
});
// 发现原生线程池
applicationcontext.getbeansoftype(threadpoolexecutor.class)
.foreach((name, executor) -> {
allpools.put("native-" + name, executor);
});
// 发现所有executor(包括@async使用的)
applicationcontext.getbeansoftype(executor.class)
.foreach((name, executor) -> {
if (executor instanceof threadpooltaskexecutor) {
threadpoolexecutor pool = ((threadpooltaskexecutor) executor).getthreadpoolexecutor();
allpools.putifabsent("executor-" + name, pool);
} else if (executor instanceof threadpoolexecutor) {
allpools.putifabsent("executor-" + name, (threadpoolexecutor) executor);
}
});
log.info("发现线程池数量: {}", allpools.size());
}
private void logpoolstatus(string name, threadpoolexecutor pool) {
int activecount = pool.getactivecount();
int poolsize = pool.getpoolsize();
int queuesize = pool.getqueue().size();
int queuecapacity = queuesize + pool.getqueue().remainingcapacity();
long completedtasks = pool.getcompletedtaskcount();
string status = (activecount == 0) ? "空闲" : "忙碌";
double usagerate = pool.getmaximumpoolsize() > 0 ?
(double) activecount / pool.getmaximumpoolsize() * 100 : 0;
if (usagerate > 80) {
log.warn("🚨 线程池[{}]: {} 活跃{}/最大{} (使用率{:.1f}%), 队列{}/{}",
name, status, activecount, pool.getmaximumpoolsize(), usagerate,
queuesize, queuecapacity);
} else {
log.info("线程池[{}]: {} 活跃{}/核心{}, 队列{}/{}, 完成: {}",
name, status, activecount, poolsize, queuesize, queuecapacity, completedtasks);
}
}
// 获取特定线程池状态
public threadpoolstats getpoolstats(string poolname) {
threadpoolexecutor pool = allpools.get(poolname);
if (pool != null) {
return new threadpoolstats(
pool.getactivecount(),
pool.getpoolsize(),
pool.getqueue().size(),
pool.getqueue().remainingcapacity(),
pool.getcompletedtaskcount()
);
}
return null;
}
// 统计类
@data
@allargsconstructor
public static class threadpoolstats {
private int activecount;
private int poolsize;
private int queuesize;
private int queueremainingcapacity;
private long completedtaskcount;
}
}
总结
- 默认情况下,
threadpoolmonitor只能监控直接注入的特定线程池 - 需要特殊处理才能监控所有线程池:
- 自动发现所有
threadpooltaskexecutor和threadpoolexecutorbean - 注册机制手动管理
- 集成监控框架如 micrometer
- 自动发现所有
- 生产推荐:使用方案2或方案4的自动发现机制
关键是要在应用启动后自动发现所有线程池实例,而不是依赖单个注入。
到此这篇关于springboot监控所有线程池的四种解决方案及代码案例的文章就介绍到这了,更多相关springboot监控所有线程池内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论