背景
最近在项目开发上,有一个定时核对并清理的需求,定时规则较为简单,每15分钟运行一次,并且项目中暂未接入分布式定时任务调度框架;鉴于以上两个原因,我决定直接用 spring scheduling 开干。
回顾一下 springboot 项目中定义定时任务,其实就几个步骤:
- 在启动类上,或者任意一个配置类上添加
@enablescheduling注解 - 在需要运行定时任务的方法上,添加
@scheduled注解, 可以传入cron、fixeddelay、fixedrate三个值之一cron: 传入 spring cron 表达式,推荐使用fixeddelay: 以调用完成时刻开始计算间隔时间,单位是毫秒fixedrate: 以调用开始时刻来计算间隔时间,单位是毫秒
- 在这个方法内调用定时任务实际的执行逻辑
我用的cron表达式很快就把这个需求实现了;但这时变数出现了,要求cron表达式支持 nacos 动态配置,并且「动态配置后,不需要重启服务也能生效」
让我一时间犯了难,后面查询相关资料后,也实现了这个新需求,接下来我分享一下解题过程
实现动态变更定时机制
配置化 cron 表达式
我们知道,如果一个 java bean 使用 @configurationproperties 修饰后,当它的配置值通过 nacos 修改后,nacos client 会收到变更事件,从而刷新对应的属性值,所以我们考虑把 cron 表达式放入一个 properties 类管理,并使用 @configurationproperties 修饰
@data
@component
@configurationproperties(prefix = "task-schedule")
public class taskproperties {
private string cronexpression = "*/5 * * * * ?";
}
然后我们可以把这个 cron 表达式通过 placeholder 绑定到 @schedule 的 cron 中
@slf4j
@component
public class simpletask {
@scheduled(cron = "#{@taskproperties.cronexpression}")
public void simpletask2() {
log.info("simpletask2 scheduled");
}
}
spring schedule 调度规则
但是启动项目之后发现,无论怎么修改 nacos 上的配置值,这个定时任务的执行间隔都不会随配置值更新,只会按照首次启动时的 cron 表达式来进行
翻看源码 scheduledannotationbeanpostprocessor 这个类的 postprocessafterinitialization 方法
@override
public object postprocessafterinitialization(object bean, string beanname) {
...
class<?> targetclass = aopproxyutils.ultimatetargetclass(bean);
if (!this.nonannotatedclasses.contains(targetclass) &&
annotationutils.iscandidateclass(targetclass, list.of(scheduled.class, schedules.class))) {
map<method, set<scheduled>> annotatedmethods = methodintrospector.selectmethods(targetclass,
(methodintrospector.metadatalookup<set<scheduled>>) method -> {
set<scheduled> scheduledannotations = annotatedelementutils.getmergedrepeatableannotations(
method, scheduled.class, schedules.class);
return (!scheduledannotations.isempty() ? scheduledannotations : null);
});
// non-empty set of methods
annotatedmethods.foreach((method, scheduledannotations) ->
scheduledannotations.foreach(scheduled -> processscheduled(scheduled, method, bean)));
...
}
return bean;
}
我们知道 beanpostprocessor 中的 postprocessafterinitialization 方法,在 bean 初始化后执行,看到它在逐一地调度这些定时任务
所以我们可以推断,这些定时任务只会在项目启动之后统一的调度,并不会在运行期间自动更新调度规则
追踪定时任务调度
既然我们查看源码后发现 springboot 帮我们调度只发生在项目启动之后,那么如果我们希望响应 nacos 变更而改变调度规则,我们需要把调度的主动权把握在手
我们继续追踪源码,scheduledannotationbeanpostprocessor 的 processscheduled 方法,调用 processscheduledsync
protected void processscheduled(scheduled scheduled, method method, object bean) {
...
processscheduledsync(scheduled, method, bean);
}
private void processscheduledsync(scheduled scheduled, method method, object bean) {
runnable task;
try {
task = createrunnable(bean, method, scheduled.scheduler());
}
catch (illegalargumentexception ex) {
...
}
processscheduledtask(scheduled, task, method, bean);
}
我们目前先看同步调用的情况,可以看到 processscheduledsync 方法中,做了两件事:
- 创建一个
runnable对象 - 调度这个
runnable对象
可以得到一点启发:定时任务相当于创建一个任务,然后使用调度器按规则不停地调用,这里的调度器我们可以简单理解为 scheduled 线程池
如果我们可以拿到这个定时任务执行的调度器,缓存起来,这样调度的时机、调度的规则都可以由我们自己控制了;而创建一个 runnable 对象其实也不难,可以简单创建后直接把定时任务包装起来
查看 processscheduledtask 方法,它确实是这么做的
private void processscheduledtask(scheduled scheduled, runnable runnable, method method, object bean) {
try {
...
// determine initial delay
...
// check cron expression
string cron = scheduled.cron();
if (stringutils.hastext(cron)) {
string zone = scheduled.zone();
if (this.embeddedvalueresolver != null) {
cron = this.embeddedvalueresolver.resolvestringvalue(cron);
zone = this.embeddedvalueresolver.resolvestringvalue(zone);
}
if (stringutils.haslength(cron)) {
assert.istrue(initialdelay.isnegative(), "'initialdelay' not supported for cron triggers");
processedschedule = true;
if (!scheduled.cron_disabled.equals(cron)) {
crontrigger trigger;
if (stringutils.hastext(zone)) {
trigger = new crontrigger(cron, stringutils.parsetimezonestring(zone));
}
else {
trigger = new crontrigger(cron);
}
tasks.add(this.registrar.schedulecrontask(new crontask(runnable, trigger)));
}
}
}
// check fixed delay
...
// check fixed rate
...
// finally register the scheduled tasks
synchronized (this.scheduledtasks) {
set<scheduledtask> regtasks = this.scheduledtasks.computeifabsent(bean, key -> new linkedhashset<>(4));
regtasks.addall(tasks);
}
}
...
}
this.registrar.schedulecrontask(new crontask(runnable, trigger)),这个 this.registrar 的类型是 scheduledtaskregistrar,这一句我们可以知道 scheduledtaskregistrar 就是核心的调度器,负责调度各种定时任务,比如:cron、fixdelay 等等
它依靠它里面的 taskscheduler 来负责执行最终的任务调度,默认的实现类是 threadpooltaskscheduler,它依靠 scheduledexecutorservice 来最终实现定时任务的调度
掌握调度主动权
继续翻阅源码,我们发现 schedulingconfigurer 接口,它允许我们使用 scheduledtaskregistrar 来定制化定时任务的执行,有了上面的一点铺垫后,这个接口就很好理解了
@functionalinterface
public interface schedulingconfigurer {
/**
* callback allowing a {@link org.springframework.scheduling.taskscheduler}
* and specific {@link org.springframework.scheduling.config.task} instances
* to be registered against the given the {@link scheduledtaskregistrar}.
* @param taskregistrar the registrar to be configured
*/
void configuretasks(scheduledtaskregistrar taskregistrar);
}
我们可以拿到方法参数的 taskregistrar,像源码一样 schedulecrontask
这种方式和使用 @schedule 相比,更灵活,但是代码也会更为复杂一点,属于编程式调度
并且还可以使用一个成员属性来接收这个 scheduledtaskregistrar,这样可以随时使用
protected scheduledtaskregistrar taskregistrarholder;
@override
public void configuretasks(@nonnull scheduledtaskregistrar taskregistrar) {
// set scheduledtaskregistrar so that we can use it to schedule the timer task
taskregistrarholder = taskregistrar;
}
订阅 nacos 配置更新事件
对于刚才定义的 properties 类,即使 nacos 可以帮我们刷新配置,但是不做任何的自定义的话,只是单纯刷新对象属性值也起不到动态的目的,所以我们需要订阅并做出自定义的响应逻辑
通过查阅资料找到了关键组件 nacosconfigmanager,可以通过这样的方式来订阅配置变更,并且方法上还需要订阅 applicationreadyevent
@eventlistener(applicationreadyevent.class)
public void refreshconfig() throws exception {
// add nacos config listener
nacosconfigmanager.getconfigservice().addlistener("dynamic-schedule.yml", "default_group", new abstractconfigchangelistener() {
@override
public void receiveconfigchange(configchangeevent event) {
// receive config change event
...
}
});
log.info("added nacos config listener successfully, especially for cron-expression");
}
由于这种方式是订阅整个配置文件的配置变更,每一个配置被更新都会执行这个 refreshconfig 方法,所以在里面需要根据具体的配置项来监听
collection<configchangeitem> changeitems = event.getchangeitems();
for (configchangeitem changeitem : changeitems) {
log.info("config changed item: {}", changeitem.getkey());
string key = changeitem.getkey();
if (!("task-schedule.cron-expression".equals(key) || "task-schedule.cronexpression".equals(key))) {
log.info("cron-expression has not been changed, doesn't response it");
return false;
}
log.info("schedule config changed, new cron-expression: {}, now refresh the timer task", changeitem.getnewvalue());
return true;
}
至此,我们就实现了配置自定义监听,并且监听后我们还能做出其他后续的响应逻辑
最终效果
有了上面自定义调度和自定义订阅之后,我们只需要把他们整合一下,就能实现
另外针对任务暂停,这里也做简单的演示
初始的 cron 表达式是
*/5 * * * * ?,项目启动:
修改 nacos 配置,改成
*/8 * * * * ?,并且不重启服务:
修改 nacos 配置,改成
-,让任务停止,并且不重启服务:
在 spring schedule 中,如果 cron 表达式是一个
-也是表示停止
到此这篇关于基于nacos实现springboot动态定时任务调度的文章就介绍到这了,更多相关springboot动态定时任务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论