当前位置: 代码网 > it编程>编程语言>Java > SpringBoot监控所有线程池的四种解决方案及代码案例

SpringBoot监控所有线程池的四种解决方案及代码案例

2025年12月01日 Java 我要评论
问题分析1.默认监控的局限性@componentpublic class threadpoolmonitor { @autowired private threadpooltask

问题分析

1.默认监控的局限性

@component
public class threadpoolmonitor {
    
    @autowired
    private threadpooltaskexecutor taskexecutor; // 只能监控这一个线程池
    
    public void monitor() {
        // 只能监控 taskexecutor 这个特定的bean
        system.out.println("活跃线程: " + taskexecutor.getactivecount());
    }
}

解决方案

方案1:手动注册所有线程池

@component
public class threadpoolmonitor {
    
    private final map<string, threadpooltaskexecutor> executors = new concurrenthashmap<>();
    
    // 手动注册线程池
    public void registerexecutor(string name, threadpooltaskexecutor executor) {
        executors.put(name, executor);
    }
    
    @scheduled(fixedrate = 30000)
    public void monitorall() {
        executors.foreach((name, executor) -> {
            threadpoolexecutor pool = executor.getthreadpoolexecutor();
            log.info("线程池[{}] - 活跃: {}/{}, 队列: {}/{}, 完成: {}",
                name,
                pool.getactivecount(),
                pool.getpoolsize(),
                pool.getqueue().size(),
                pool.getqueue().remainingcapacity() + pool.getqueue().size(),
                pool.getcompletedtaskcount());
        });
    }
}

// 在配置中注册
@configuration
public class executorconfig {
    
    @autowired
    private threadpoolmonitor monitor;
    
    @bean("emailexecutor")
    public threadpooltaskexecutor emailexecutor() {
        threadpooltaskexecutor executor = new threadpooltaskexecutor();
        // 配置...
        executor.initialize();
        
        // 注册到监控器
        monitor.registerexecutor("emailexecutor", executor);
        return executor;
    }
    
    @bean("smsexecutor")
    public threadpooltaskexecutor smsexecutor() {
        threadpooltaskexecutor executor = new threadpooltaskexecutor();
        // 配置...
        executor.initialize();
        
        monitor.registerexecutor("smsexecutor", executor);
        return executor;
    }
}

方案2:自动发现所有线程池(推荐)

@component
public class globalthreadpoolmonitor {
    
    @autowired
    private applicationcontext applicationcontext;
    
    @scheduled(fixedrate = 30000)
    public void monitorallthreadpools() {
        // 获取所有 threadpooltaskexecutor 类型的bean
        map<string, threadpooltaskexecutor> executors = 
            applicationcontext.getbeansoftype(threadpooltaskexecutor.class);
        
        // 获取所有 threadpoolexecutor 类型的bean(直接创建的)
        map<string, threadpoolexecutor> nativeexecutors = 
            applicationcontext.getbeansoftype(threadpoolexecutor.class);
        
        log.info("=== 线程池监控报告 ===");
        
        // 监控 spring 封装的线程池
        executors.foreach((beanname, executor) -> {
            if (executor.getthreadpoolexecutor() != null) {
                printpoolstats(beanname, executor.getthreadpoolexecutor());
            }
        });
        
        // 监控原生线程池
        nativeexecutors.foreach((beanname, executor) -> {
            printpoolstats(beanname, executor);
        });
    }
    
    private void printpoolstats(string name, threadpoolexecutor executor) {
        log.info("线程池[{}]: 活跃{}/核心{}, 队列{}/{}, 完成任务: {}, 拒绝: {}",
            name,
            executor.getactivecount(),
            executor.getpoolsize(),
            executor.getqueue().size(),
            executor.getqueue().size() + executor.getqueue().remainingcapacity(),
            executor.getcompletedtaskcount(),
            executor.getrejectedexecutionhandler().getclass().getsimplename());
    }
}

方案3:监控 @async 使用的线程池

@component
public class asyncthreadpoolmonitor {
    
    @autowired
    private applicationcontext applicationcontext;
    
    @scheduled(fixedrate = 30000)
    public void monitorasyncpools() {
        try {
            // 通过反射获取spring内部的线程池
            map<string, executor> asyncexecutors = 
                applicationcontext.getbeansoftype(executor.class);
            
            asyncexecutors.foreach((name, executor) -> {
                if (executor instanceof threadpooltaskexecutor) {
                    threadpoolexecutor pool = ((threadpooltaskexecutor) executor).getthreadpoolexecutor();
                    printasyncpoolstats(name, pool);
                } else if (executor instanceof threadpoolexecutor) {
                    printasyncpoolstats(name, (threadpoolexecutor) executor);
                } else if (executor instanceof taskexecutor) {
                    log.info("executor [{}]: 类型 {}", name, executor.getclass().getsimplename());
                }
            });
        } catch (exception e) {
            log.warn("监控异步线程池失败: {}", e.getmessage());
        }
    }
    
    private void printasyncpoolstats(string name, threadpoolexecutor pool) {
        double usagerate = pool.getmaximumpoolsize() > 0 ? 
            (double) pool.getactivecount() / pool.getmaximumpoolsize() * 100 : 0;
            
        log.warn("异步线程池[{}]: 活跃{}/最大{}, 使用率: {:.1f}%, 队列: {}/{}",
            name,
            pool.getactivecount(),
            pool.getmaximumpoolsize(),
            usagerate,
            pool.getqueue().size(),
            pool.getqueue().size() + pool.getqueue().remainingcapacity());
    }
}

方案4:集成micrometer监控(生产环境推荐)

@component
public class micrometerthreadpoolmonitor {
    
    private final meterregistry meterregistry;
    private final list<threadpoolexecutor> monitoredpools = new arraylist<>();
    
    public micrometerthreadpoolmonitor(meterregistry meterregistry) {
        this.meterregistry = meterregistry;
    }
    
    // 注册要监控的线程池
    public void registerpool(string name, threadpoolexecutor pool) {
        monitoredpools.add(pool);
        
        // 注册指标
        gauge.builder("thread.pool.active.count", pool, threadpoolexecutor::getactivecount)
            .tag("pool", name)
            .description("活跃线程数")
            .register(meterregistry);
            
        gauge.builder("thread.pool.queue.size", pool, p -> p.getqueue().size())
            .tag("pool", name)
            .description("队列大小")
            .register(meterregistry);
            
        gauge.builder("thread.pool.completed.tasks", pool, threadpoolexecutor::getcompletedtaskcount)
            .tag("pool", name)
            .description("完成任务数")
            .register(meterregistry);
    }
    
    @eventlistener
    public void onapplicationready(applicationreadyevent event) {
        // 应用启动后自动发现并注册所有线程池
        applicationcontext context = event.getapplicationcontext();
        
        map<string, threadpooltaskexecutor> springexecutors = 
            context.getbeansoftype(threadpooltaskexecutor.class);
        
        map<string, threadpoolexecutor> nativeexecutors = 
            context.getbeansoftype(threadpoolexecutor.class);
        
        springexecutors.foreach((name, executor) -> {
            if (executor.getthreadpoolexecutor() != null) {
                registerpool("spring-" + name, executor.getthreadpoolexecutor());
            }
        });
        
        nativeexecutors.foreach((name, executor) -> {
            registerpool("native-" + name, executor);
        });
        
        log.info("已注册监控的线程池数量: {}", monitoredpools.size());
    }
}

完整的生产级监控方案

@configuration
public class threadpoolmonitorconfig {
    
    @bean
    @conditionalonmissingbean
    public globalthreadpoolmonitor globalthreadpoolmonitor() {
        return new globalthreadpoolmonitor();
    }
}

@component
@slf4j
public class globalthreadpoolmonitor {
    
    @autowired
    private applicationcontext applicationcontext;
    
    private final map<string, threadpoolexecutor> allpools = new concurrenthashmap<>();
    
    @postconstruct
    public void init() {
        discoverallthreadpools();
    }
    
    @scheduled(fixedrate = 30000)
    public void monitorallpools() {
        if (allpools.isempty()) {
            discoverallthreadpools();
        }
        
        log.info("======= 线程池监控报告 =======");
        allpools.foreach(this::logpoolstatus);
        log.info("======= 监控报告结束 =======");
    }
    
    private void discoverallthreadpools() {
        // 发现spring封装的线程池
        applicationcontext.getbeansoftype(threadpooltaskexecutor.class)
            .foreach((name, executor) -> {
                if (executor.getthreadpoolexecutor() != null) {
                    allpools.put("spring-" + name, executor.getthreadpoolexecutor());
                }
            });
        
        // 发现原生线程池
        applicationcontext.getbeansoftype(threadpoolexecutor.class)
            .foreach((name, executor) -> {
                allpools.put("native-" + name, executor);
            });
        
        // 发现所有executor(包括@async使用的)
        applicationcontext.getbeansoftype(executor.class)
            .foreach((name, executor) -> {
                if (executor instanceof threadpooltaskexecutor) {
                    threadpoolexecutor pool = ((threadpooltaskexecutor) executor).getthreadpoolexecutor();
                    allpools.putifabsent("executor-" + name, pool);
                } else if (executor instanceof threadpoolexecutor) {
                    allpools.putifabsent("executor-" + name, (threadpoolexecutor) executor);
                }
            });
        
        log.info("发现线程池数量: {}", allpools.size());
    }
    
    private void logpoolstatus(string name, threadpoolexecutor pool) {
        int activecount = pool.getactivecount();
        int poolsize = pool.getpoolsize();
        int queuesize = pool.getqueue().size();
        int queuecapacity = queuesize + pool.getqueue().remainingcapacity();
        long completedtasks = pool.getcompletedtaskcount();
        
        string status = (activecount == 0) ? "空闲" : "忙碌";
        double usagerate = pool.getmaximumpoolsize() > 0 ? 
            (double) activecount / pool.getmaximumpoolsize() * 100 : 0;
            
        if (usagerate > 80) {
            log.warn("🚨 线程池[{}]: {} 活跃{}/最大{} (使用率{:.1f}%), 队列{}/{}", 
                name, status, activecount, pool.getmaximumpoolsize(), usagerate, 
                queuesize, queuecapacity);
        } else {
            log.info("线程池[{}]: {} 活跃{}/核心{}, 队列{}/{}, 完成: {}", 
                name, status, activecount, poolsize, queuesize, queuecapacity, completedtasks);
        }
    }
    
    // 获取特定线程池状态
    public threadpoolstats getpoolstats(string poolname) {
        threadpoolexecutor pool = allpools.get(poolname);
        if (pool != null) {
            return new threadpoolstats(
                pool.getactivecount(),
                pool.getpoolsize(),
                pool.getqueue().size(),
                pool.getqueue().remainingcapacity(),
                pool.getcompletedtaskcount()
            );
        }
        return null;
    }
    
    // 统计类
    @data
    @allargsconstructor
    public static class threadpoolstats {
        private int activecount;
        private int poolsize;
        private int queuesize;
        private int queueremainingcapacity;
        private long completedtaskcount;
    }
}

总结

  • 默认情况下,threadpoolmonitor 只能监控直接注入的特定线程池
  • 需要特殊处理才能监控所有线程池:
    • 自动发现所有 threadpooltaskexecutorthreadpoolexecutor bean
    • 注册机制手动管理
    • 集成监控框架如 micrometer
  • 生产推荐:使用方案2或方案4的自动发现机制

关键是要在应用启动后自动发现所有线程池实例,而不是依赖单个注入。

到此这篇关于springboot监控所有线程池的四种解决方案及代码案例的文章就介绍到这了,更多相关springboot监控所有线程池内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com