1.调度服务
1.1.jdk之scheduledexecutorservice
讲到调度任务,我们脑海里马上会想到scheduledexecutorservice。
scheduledexecutorservice是 java java.util.concurrent 包中的一个接口,它继承自 executorservice 接口。它主要用于在给定的延迟后运行任务,或者定期地执行任务。这个接口提供了几种安排任务执行的方法,包括单次执行、定期执行和周期性执行。
以下是 scheduledexecutorservice 提供的一些关键方法:
- schedule(callable<v> callable, long delay, timeunit unit): 安排所提交的
callable
任务在指定的延迟后运行,返回一个future
,代表任务的结果。 - schedule(runnable command, long delay, timeunit unit): 安排所提交的
runnable
任务在指定的延迟后运行。 - scheduleatfixedrate(runnable command, long initialdelay, long period, timeunit unit): 安排所提交的
runnable
任务在指定的初始延迟后首次启动,并且随后按指定的周期重复执行。 - schedulewithfixeddelay(runnable command, long initialdelay, long delay, timeunit unit): 安排所提交的
runnable
任务在指定的初始延迟后首次启动,并且随后在每次执行结束和下次执行开始之间都存在指定的延迟。
然而,如果采用了sprintboot,我们也可以直接采用springboot提供的调试线程池。这其中有一个最大的优势在于,可以利用spring的cron表达式,采用方法注解的方式,不用引入quartz第三方工具。
1.2.springboot使用调度线程池
我们尝试从源代码的角度,来看看springboot提供的调度线程池是怎么创建的。
首先,启用调度配置,启动类加上@enablescheduling
springboot的自动配置类一般以autoconfiguration作为后缀,采用模糊搜索scheduleautoconfiguration可以找到目标taskschedulingautoconfiguration。
从截图我们可以看出,我们只需在application.yml加入以下的配置就可以启动了
spring: task: ## 定时任务(业务上定时任务量不多,2个足矣) scheduling: ## 线程池核心线程数量 pool: size: 2 ## 线程名称前线 threadnameprefix: common-scheduling-
spring对线程池进行二次封闭,最终调用的还是jdk的threadpoolexecutor类,我们在该类的构造函数打个断点,可以看到,pool.size参数已经被传参:
1.3.threadpoolexecutor核心参数与执行流程
这里有必要先介绍下 threadpoolexecutor类的几个函数参数及基本运行机制
构造函数参数:
- 核心线程数(core pool size): 线程池中始终保持的线程数量,即使它们处于空闲状态。如果任务数量少于核心线程数,线程池会创建新的线程来处理任务,而不会立即回收这些线程。
- 最大线程数(maximum pool size): 线程池中允许的最大线程数量。如果任务数量超过了核心线程数但小于最大线程数,且工作队列已满,线程池会创建新的线程来处理任务,直到达到最大线程数。
- 工作队列(work queue): 用于存放待执行任务的阻塞队列。当所有核心线程都在忙碌时,新的任务会被放入工作队列中等待执行。
- 线程工厂(thread factory): 用于创建新线程的工厂。它提供了一种方式来定制线程的创建过程,例如设置线程的名称、优先级、是否为守护线程等。
- 拒绝策略(rejected execution handler): 当任务无法被线程池及时处理时(即当线程池已满,且工作队列已满),线程池会采用拒绝策略来处理新提交的任务。常见的拒绝策略包括:
abortpolicy
:抛出rejectedexecutionexception
。callerrunspolicy
:由调用者线程运行该任务。discardpolicy
:静默丢弃任务。discardoldestpolicy
:丢弃队列中最老的任务,然后尝试再次提交当前任务。- 保持活动时间(keep-alive time): 非核心线程空闲时在终止前等待新任务的最长时间。如果线程池允许核心线程空闲,这个参数也适用于核心线程。
- 时间单位(time unit): 与保持活动时间配合使用的时间单位,例如
timeunit.seconds
。
执行流程:
- 如果线程池中的线程数量少于核心线程数,即使有空闲线程,线程池也会优先创建新线程来执行新的任务。
- 如果线程池中的线程数量达到核心线程数,新的任务会被放入工作队列等待执行。
- 如果工作队列已满且线程数量少于最大线程数,线程池会创建新的非核心线程来执行任务。
- 如果工作队列已满且线程数量达到最大线程数,新的任务会被拒绝,线程池会采用拒绝策略来处理。
1.4.自定义线程池
由于spingboot对调度任务线程池的参数支持有限,如果想定制自己的参数,可以注入自己的调度线程池,从代码可看出:
@configuration public class threadpoolconfig { @bean public scheduledexecutorservice scheduledexecutorservice() { return new scheduledthreadpoolexecutor(2, new namedthreadfactory("common-schedule"), new threadpoolexecutor.callerrunspolicy()) { @override protected void afterexecute(runnable r, throwable t) { super.afterexecute(r, t); } }; } }
1.5.spring cron注解
cron 表达式是一种用于描述定时任务触发时间的字符串表达式。它由多个时间字段组成,每个字段代表定时任务在特定时间单位上的触发条件。
1.5.1.cron表达式语法格式
秒 分 时 日 月 星期 年份
其中,每个时间字段都有对应的取值范围和特殊符号。下面是每个时间字段的详细说明:
1、秒(seconds):取值范围为 0~59。例如,`0/5` 表示从0秒开始,每隔 5 秒触发一次,`*` 表示每秒都触发。
2、分钟(minutes):取值范围为 0~59。例如,`0/5` 表示从0分钟开始,每隔 5 分钟触发一次,`*` 表示每分钟都触发。
3、小时(hours):取值范围为 0~23。例如,`0/2` 表示从0小时开始,每隔 2 小时触发一次,`*` 表示每小时都触发。
4、日期(day of month):取值范围为 1~31。例如,`1,15` 表示每月的 1 日和 15 日触发,`*` 表示每天都触发。
5、月份(month):取值范围为 1~12,也可以使用英文缩写 jan、feb、mar 等。例如,`1,6` 表示一月和六月触发,`*` 表示每个月都触发。
6、 星期(day of week):取值范围为 1~7,1 表示星期日,2 表示星期一,以此类推,也可以使用英文缩写 sun、mon、tue 等。例如,`2-6` 表示星期一到星期五触发,`*` 表示每个星期都触发。
7、年份(year):可选字段,表示触发条件的年份。例如,`2023` 表示在 2023 年触发,`*` 表示每年都触发。
除了取值范围,cron 表达式还支持一些特殊符号,用于指定特定的触发条件,例如:
- - 星号(*):代表所有可能的取值,表示不限制该时间字段的取值范围。
- - 问号(?):仅在日期和星期字段中使用,表示不指定具体的取值,可以任意匹配。
- - 斜线(/):表示间隔触发,例如在分钟字段中,"*/5" 表示每隔 5 分钟触发一次。
- - 逗号(,):用于指定多个取值,例如在小时字段中,"1,3,5" 表示在第 1、3、5 小时触发。
- - 减号(-):用于指定一个范围,例如在月份字段中,"3-6" 表示三月到六月触发。
- l : 表示最后,只能出现在星期和每月第几天域,如果在星期域使用1l,意味着在最后的一个星期日触发。
- w : 表示有效工作日(周一到周五),只能出现在每月第几日域,系统将在离指定日期的最近的有效工作日触发事件。注意一点,w的最近寻找不会跨过月份
- lw : 这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。
# : 用于确定每个月第几个星期几,只能出现在每月第几天域。例如在1#3,表示某月的第三个星期日。
1.5.2.cron表达式示例
作用 | 表达式 |
每隔5秒执行一次 | */5 * * * * ? |
每天中午12点执行一次 | 0 0 12 * * ? |
2024年的每天上午10:00执行一次 | 0 0 10 * * ? 2023 |
每天下午6点到下午6:59每分钟执行一次 | 0 * 18 * * ? |
每月的最后一个星期五上午10:30执行一次 | 0 30 10 ? * 6l |
每月的第4个星期五上午10:25执行一次 | 0 25 10 ? * 6#4 |
每天上午8点,下午1点,4点执行一次 | 0 0 8,13,16 * * ? |
2.异步任务
2.1.springboot配置
在软件开发中,有些任务比较耗时但又无需马上获得结果。一般地,这些任务我们可以采用独立线程池异步执行。如果程序基于springboot环境,我们有现成的工具可以使用。
首先,我们需要在程序启动入口类增加@enableasync。
借着,我们尝试从源代码的角度,来看看springboot提供的异步线程池是怎么创建的。
从springboot的命名风格可知,通过模糊搜索taskautoconfiguration,可以找到taskexecutionautoconfiguration,如下:
从源代码可知,只需在application.yml配置如下参数即可:
spring: execution: pool: coresize: 4 queuecapacity: 64 maxsize: 8 ## 禁止空闲线程关闭,保证最少有core个存活线程 allowcorethreadtimeout: false keepalive: 300s threadnameprefix: common-async_task-
这些参数跟jdk的threadpoolexecutor类的构造参数非常相似,这里不作解析 。
2.2.使用异步任务
使用方法,只要在目标方法的签名加上@async
然而,执行结果却出乎意外(在main主线程上执行,没有异步执行)
熟悉springaop机制的同学,马上知道这是因为异步任务底层是基于动态代理机制实现的。spring aop 代理只有在通过 spring 容器获取 bean 时才会创建。
当在同一个类内部调用一个方法时,调用的是原始对象,而不是代理对象。因此,内部调用不会经过 spring aop 代理,也就无法触发异步执行。解决这个问题最简单的方法是用一个新的类来管理异步执行方法。
问题解决
2.3.模拟系统繁忙进行性能测试
我们尝试模拟一些极端情况,系统处理不过来的情况。修改上面的配置,改为
pool: coresize: 1 queuecapacity: 1 maxsize: 8
同时,异步执行增加时间延迟来模拟耗时任务
@component @slf4j class asynctaskhandler { @async public void busytask1() { try { thread.sleep(5000); } catch (interruptedexception ignored) { } log.info("----------busytask1---------"); } @async public void busytask2() { try { thread.sleep(5000); } catch (interruptedexception ignored) { } log.info("----------busytask2---------"); } }
执行结果会出现报错(线程数已达到最大数量,且任务队列已满,无法添加新任务)
调度任务执行频率是可预见的,有多少个任务,执行频率,开发可知,核心数量,最大数量,队列容量比较好设定。而异步执行频率很大程度是由系统的使用者(用户)决定的,因此这些参数需要根据流量动态修改。
2.4.异步线程池拒绝策略
如果不想把 queuecapacity和maxsize都设置成很大的话,我们可以考虑修改下线程池的拒绝策略。最妥当的方式是,既然异步不了,那就"熔断"成同步,直接在调用者所在的业务线程执行。然而,springboot没有提供相应的配置项。为此,我们只能关闭springboot的自动配置了。
从代码可看出,只要提供一个executor实例,并且名字叫taskexecutor即可。
话不多说,上代码
@configuration public class threadpoolconfig { private final int core = runtime.getruntime().availableprocessors(); /** * springboot 自动注入的异步执行线程池,拒绝策略为丢弃,难以配置maxpoolsize参数 * @see taskexecutionautoconfiguration#taskexecutorbuilder */ @bean(name = "taskexecutor") public threadpooltaskexecutor threadpooltaskexecutor(taskexecutionproperties properties) { threadpooltaskexecutor executor = new threadpooltaskexecutor(); executor.setcorepoolsize(properties.getpool().getcoresize()); executor.setthreadnameprefix(properties.getthreadnameprefix()); executor.setmaxpoolsize(properties.getpool().getmaxsize()); executor.setqueuecapacity(properties.getpool().getqueuecapacity()); executor.setkeepaliveseconds((int) properties.getpool().getkeepalive().toseconds()); executor.setallowcorethreadtimeout(properties.getpool().isallowcorethreadtimeout()); // 超过队列容量,则在业务线程上执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); return executor; } }
重新运行程序,可以看到,当任务超过线程池负载的时候,多余的线程会在调用线程上执行,变为同步代码
个人认为:使用springboot创建的线程池,代码也只是稍微简化一点点。
采用原生线程池,每个业务代码必须实现runnable接口,而使用springboot的异步线程池,只需以方法注解的形式即可,底层aop会生成对应的代理方法。但要确保避免内部方法调用导致异步逻辑失效。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论