当前位置: 代码网 > it编程>编程语言>Java > 线程池ThreadPoolExecutor应用过程

线程池ThreadPoolExecutor应用过程

2025年12月11日 Java 我要评论
一个老生常谈的话题:线程的创建及销毁是非常消耗时间及资源的,所以线程应该交由线程池去执行。threadpoolexecutor构造说明及常用方法threadpoolexecutor提供了很多构造方法,

一个老生常谈的话题:线程的创建及销毁是非常消耗时间及资源的,所以线程应该交由线程池去执行。

threadpoolexecutor构造说明及常用方法

threadpoolexecutor提供了很多构造方法,这里主要说以下这个,其他构造方法都是基于此方法:

public threadpoolexecutor(int corepoolsize,
                              int maximumpoolsize,
                              long keepalivetime,
                              timeunit unit,
                              blockingqueue<runnable> workqueue,
                              threadfactory threadfactory,
                              rejectedexecutionhandler handler)
  • 参数int corepoolsize:核心线程池大小,不能小于0;
  • 参数int maximumpoolsize:最大线程池大小(核心线程数+临时线程数),不能小于等于0且不能小于corepoolsize参数,线程池的大小会在corepoolsize与maximumpoolsize之间动态变化;
  • 参数long keepalivetime:临时线程(非核心线程,核心线程空闲之后不会被销毁)最大空闲时间,不能小于0;
  • 参数timeunit unit:keepalivetime参数的时间单位;
  • 参数blockingqueue<runnable> workqueue:线程等待队列;
  • 参数threadfactory threadfactory:线程创建工厂;
  • 参数rejectedexecutionhandler handler:拒绝策略;

下面是构造threadpoolexecutor的参数校验:

其他常用方法:

  • int prestartallcorethreads():根据线程核心数,启动所有的核心线程,让它们全部处于空闲状态等待工作
  • boolean prestartcorethread():随机启动一个核心线程,当所有核心线程都已经启动的时候,返回false(表示未启动到有效资源);
  • void allowcorethreadtimeout(boolean value):设置是否允许临时线程永久空闲存活,设置true后,keepalivetime参数失效;
  • boolean allowscorethreadtimeout():获取是否允许临时线程永久空闲存活;
  • void shutdown():销毁线程池,此前还存在于线程池的任务依然会被执行;
  • list<runnable> shutdownnow():销毁线程池,与shutdown()的区别是,未执行完成的线程会被标记为中断;

参数举例说明:

1、当创建一个任务并交给线程池执行执行后,会从消耗一个核心线程资源,当该任务执行完毕之后,会变成一个空闲线程,另外,如果核心线程数足以支撑任务运行,即使有核心空闲线程,依然会创建一个新的核心线程资源;

2、当加入线程池的任务超过核心线程数,会进入workqueue中等待执行;

3、当加入线程池的任务超过核心线程数,溢出的任务数如果超过等待队列长度,会直接创建大于corepoolsize且小于maximumpoolsize的线程资源用于执行任务,超过核心线程数的部分称为临时线程;

4、keepalivetime:当线程池被扩大到corepoolsize与maximumpoolsize之间之后,当执行完所有任务后,不再有新的任务进来,超出corepoolsize的部分会作为空闲线程,会在keepalivetime指定时间后,线程池大小缩小为corepoolsize,如果设置allowcorethreadtimeout(true),则空闲线程一直不会销毁;

5、blockingqueue<runnable> workqueue:用于保存等待执行的任务的阻塞队列接口,java提供了以下实现类:

线程池中,blockingqueue是典型的"生产者消费者"模型:生产者调用插入元素的方法(add/offer/put等方法),消费者调用移除元素的方法(remove/poll/take等方法),其中,生产者插入和消费者移除都有阻塞和非阻塞的方法,offer/poll是非阻塞方法,put/take是阻塞方法(队列后面单独再写);

threadpoolexecutor提交任务采用的是不阻塞的offer方法,从队列中获取任务采用的是阻塞的take方法,正是因为threadpoolexecutor使用了不阻塞的offer方法,所以当队列容量已满,线程池会去创建新的临时线程,去处理队列中溢出的任务。

  • arrayblockingqueue:是一个基于数组结构的有界阻塞队列,此队列按 fifo(先进先出)原则对元素进行排序,允许迭代中修改队列(删除元素时会更新迭代器),当然,一般也不会有场景需要迭代阻塞队列;
  • linkedblockingqueue:一个基于链表结构的默认无界双端阻塞队列,双端意味着可以像普通队列一样fifo(先进先出),可以以像栈一样filo(先进后出)。吞吐量通常要高于arrayblockingqueue(如无需在迭代中修改元素,优于arrayblockingqueue,同时记得指定队列长度,提供了一个可选有界的构造函数,而在未指明容量时,容量默认为integer.max_value);
  • synchronousqueue:一个不存储元素的阻塞队列,所谓不存储元素,正如名字描述,同步队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,即生产一个,就消费一个,因此它也被认为是无界的队列,吞吐量通常要高于linkedblockingqueue。同时,synchronousqueue不能遍历,因为它没有元素可以遍历;
  • delayqueue:使用二叉堆实现的优先级阻塞队列,通过执行时延从队列中提取任务,时间没到任务取不出来,可用于实现有延时场景的需求;
  • priorityblockingqueue:使用二叉堆实现的优先级阻塞队列,可以实现comparable接口也可以提供comparator来对队列中的元素进行比较,跟时间没有任何关系,仅仅是按照优先级取任务,如果有根据某个规则优先取出任务的场景,适用;

综上,需要根据每个队列的特点选择适合自己场景的队列。

6、rejectedexecutionhandler handler:当队列和线程池都满了(从上面可以看出,一个线程池可以容纳的的最大任务数是maximumpoolsize+队列长度,因此使用无界队列会造成队列永不会满的情况,一旦控制不好就容易出现oom),说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。

1、abortpolicy:直接抛出异常;

2、callerrunspolicy:只用调用者所在线程来运行任务;

3、discardoldestpolicy:丢弃队列里最近的一个任务,并执行当前任务;

4、discardpolicy:不处理,丢弃掉;

5、也可以根据应用场景需要来实现rejectedexecutionhandler接口自定义策略。如记录日志或持久化不能处理的任务;

注:abortpolicy/callerrunspolicy/discardoldestpolicy/discardpolicy都是threadpoolexecutor的内部类,如下:

所以,实例化需要用内部类的方式,如下:

abortpolicy abortpolicy = new threadpoolexecutor.abortpolicy();

自定义rejectedexecutionhandler拒绝策略示例:

public class rejectedexecutionhandlertest implements rejectedexecutionhandler {

	@override
	public void rejectedexecution(runnable r, threadpoolexecutor executor) {
		// 啥逻辑都没有,表示忽略,比如discardpolicy
		try {
			executor.getqueue().put(r);
		} catch (interruptedexception e) {
			// 	todo 记录错误日志等
			e.printstacktrace();
		}
		
	}
}

threadpoolexecutor的特点总结起来就是以下4点

1、当有任务提交的时候,会创建核心线程去执行任务(即使有核心线程空闲);

2、当核心线程数达到corepoolsize时,后续提交的都会进blockingqueue中排队;

3、当blockingqueue满了(offer失败),就会创建临时线程(临时线程空闲超过一定时间后,会被销毁),其中临时线程最大数量=maximumpoolsize - corepoolsize,空闲最大时间由keepalivetime控制,如果设置allowcorethreadtimeout(true),临时线程永不销毁;

4、当线程总数达到maximumpoolsize 时,后续提交的任务都会被rejectedexecutionhandler拒绝。

为什么强制要求使用threadpoolexecutor创建线程池

在executor类中,提供了以下场景的创建线程池的方法:

1、newcachedthreadpool,特点:必要时创建新线程,空闲线程会保留60s;

public static executorservice newcachedthreadpool(threadfactory threadfactory) {
        return new threadpoolexecutor(0, integer.max_value,
                                      60l, timeunit.seconds,
                                      new synchronousqueue<runnable>(),
                                      threadfactory);
    }

根据上面的threadpoolexecutor,允许的创建线程数量为 integer.max_value,可能会创建大量的线程,从而导致 oom,还无法指定拒绝策略。

2、newfixedthreadpool,特点:包含固定的线程数,空闲线程会被一直保留;

public static executorservice newfixedthreadpool(int nthreads, threadfactory threadfactory) {
        return new threadpoolexecutor(nthreads, nthreads,
                                      0l, timeunit.milliseconds,
                                      new linkedblockingqueue<runnable>(),
                                      threadfactory);
    }


public linkedblockingqueue() {
        this(integer.max_value);
    }

允许的请求队列长度为 integer.max_value,可能会堆积大量的请求,从而导致 oom,还无法指定拒绝策略。

3、newsinglethreadexecutor,特点:只有一个线程的池,顺序执行每一个提交的任务;

public static executorservice newsinglethreadexecutor(threadfactory threadfactory) {
        return new finalizabledelegatedexecutorservice
            (new threadpoolexecutor(1, 1,
                                    0l, timeunit.milliseconds,
                                    new linkedblockingqueue<runnable>(),
                                    threadfactory));
    }



public linkedblockingqueue() {
        this(integer.max_value);
    }

允许的请求队列长度为 integer.max_value,可能会堆积大量的请求,从而导致 oom,还无法指定拒绝策略。

4、scheduledthreadpoolexecutor,特点:用于构建具有延时队列的线程池,空闲线程一直被保留;

public scheduledthreadpoolexecutor(int corepoolsize) {
    super(corepoolsize, integer.max_value, 0, nanoseconds,
              new delayedworkqueue());
}

允许的创建线程数量为 integer.max_value,可能会创建大量的线程,从而导致 oom。

............省略executors其他创建线程的方法。

从上面可以看到,使用executors创建的线程池,虽然都是threadpoolexecutor,但是灵活度都不高,且创建出来的是具有一定特色的线程池,使用threadpoolexecutor类本身去创建线程池,除了可以更加明确线程池的运行规则,还能更多的规避资源耗尽的风险,毕竟我命由我不由天。

当然,threadpoolexecutor提供了相关参数的set方法,如果一定要用executors去创建,那么记得调整一下相关参数,避免大量任务堆积,产生oom。

在项目中创建线程池

如果需要在项目中创建线程池,那么应该将它设置成单例模式。

示例,用枚举的方式创建单例:

// 线程工厂
public class userthreadfactory implements threadfactory {

	// 线程组命名标识
	private final string nameprefix;
	// 线程编号
	private final atomicinteger nextid = new atomicinteger(1);

	// 定义线程组名称,在 jstack 问题排查时,非常有帮助
	userthreadfactory(string whatfeaturofgroup) {
		nameprefix = "from userthreadfactory's " + whatfeaturofgroup + "-worker-";
	}

	@override
	public thread newthread(runnable task) {
		string name = nameprefix + nextid.getandincrement();
		thread thread = new thread(task, name);
		return thread;
	}

}


// 单例线程池
public enum threadpoolenum {

	instance;
	
	private threadpoolexecutor threadpoolexecutor;

	// 枚举的特性,在jvm中只会被实例化一次
	private threadpoolenum() {
		threadpoolexecutor = new threadpoolexecutor(2, 
				4, 10, timeunit.seconds, new priorityblockingqueue<runnable>(), 
				new userthreadfactory("first-threadpool"),
				new threadpoolexecutor.discardpolicy());
	}
	
	public threadpoolexecutor getinstance() {
        return threadpoolexecutor;
    }
	
	public static void main(string[] args) throws interruptedexception {
		threadpoolexecutor executor1 = threadpoolenum.instance.getinstance();
		threadpoolexecutor executor2 = threadpoolenum.instance.getinstance();
		system.out.println(executor1 == executor2);
		runnable r = () -> {
			system.out.println("新创建的线程任务,交由线程池执行");
			system.out.println(thread.currentthread().getname());
		};
		executor1.execute(r);
		// 销毁线程池
		executor1.shutdown();
	}
	
}

执行结果:

在spring中创建线程池

@configuration
public class threadpoolconfig {

	// 线程池的参数配置可由外部配置引入
	@bean(destroymethod = "shutdown")
	public threadpoolexecutor initthreadpoolexecutor() {
		return new threadpoolexecutor(2, 
				4, 10, timeunit.seconds, new priorityblockingqueue<runnable>(), 
				new threadpoolexecutor.discardpolicy());
	}
	
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com