taskscheduler
概述
taskscheduler是spring 3.0版本后,自带了一个定时任务工具,不用配置文件,可以动态改变执行状态。也可以使用cron表达式设置定时任务。
被执行的类要实现runnable接口
taskscheduler是一个接口,它定义了6个方法
接口的6种方法
public interface taskscheduler { /** * 提交任务调度请求 * @param task 待执行任务 * @param trigger 使用trigger指定任务调度规则 * @return */ scheduledfuture schedule(runnable task, trigger trigger); /** * 提交任务调度请求 * 注意任务只执行一次,使用starttime指定其启动时间 * @param task 待执行任务 * @param starttime 任务启动时间 * @return */ scheduledfuture schedule(runnable task, date starttime); /** * 使用fixedrate的方式提交任务调度请求 * 任务首次启动时间由传入参数指定 * @param task 待执行的任务 * @param starttime 任务启动时间 * @param period 两次任务启动时间之间的间隔时间,默认单位是毫秒 * @return */ scheduledfuture scheduleatfixedrate(runnable task, date starttime, long period); /** * 使用fixedrate的方式提交任务调度请求 * 任务首次启动时间未设置,任务池将会尽可能早的启动任务 * @param task 待执行任务 * @param period 两次任务启动时间之间的间隔时间,默认单位是毫秒 * @return */ scheduledfuture scheduleatfixedrate(runnable task, long period); /** * 使用fixeddelay的方式提交任务调度请求 * 任务首次启动时间由传入参数指定 * @param task 待执行任务 * @param starttime 任务启动时间 * @param delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒 * @return */ scheduledfuture schedulewithfixeddelay(runnable task, date starttime, long delay); /** * 使用fixeddelay的方式提交任务调度请求 * 任务首次启动时间未设置,任务池将会尽可能早的启动任务 * @param task 待执行任务 * @param delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒 * @return */ scheduledfuture schedulewithfixeddelay(runnable task, long delay); }
0、threadpooltaskscheduler
在 threadpooltaskschedulerconfig 中定义 threadpooltaskscheduler bean
@configuration public class threadpooltaskschedulerconfig { @bean public threadpooltaskscheduler threadpooltaskscheduler(){ threadpooltaskscheduler threadpooltaskscheduler = new threadpooltaskscheduler(); threadpooltaskscheduler.setpoolsize(5); threadpooltaskscheduler.setthreadnameprefix( "threadpooltaskscheduler"); return threadpooltaskscheduler; } }
配置的 bean threadpooltaskscheduler 可以根据配置的池大小 5 异步执行任务。
请注意,所有与 threadpooltaskscheduler 相关的线程名称都将以threadpooltaskscheduler 为前缀。
让我们实现一个简单的任务,然后我们可以安排:
class runnabletask implements runnable{ private string message; public runnabletask(string message){ this.message = message; } @override public void run() { system.out.println(new date()+" runnable task with "+message +" on thread "+thread.currentthread().getname()); } }
1、schedule(runnable task, trigger trigger)
指定一个触发器执行定时任务。可以使用crontrigger来指定cron表达式,执行定时任务
如下:使用crontrigger 来根据 cron 表达式调度任务,可以使用提供的触发器按照某个指定的节奏或时间表运行任务,在这种情况下,runnabletask 将在每分钟的第 10 秒执行。
taskscheduler.schedule(new runnabletask("cron trigger"), crontrigger);
2、schedule(runnable task, date starttime);
指定一个具体时间点执行定时任务,可以动态的指定时间,开启任务,只执行一次
如下:配置一个任务在 1000 毫秒的固定延迟后运行,runnabletask 将始终在一次执行完成和下一次执行开始之间运行 1000 毫秒。
taskscheduler.schedule( new runnabletask("specific time, 3 seconds from now"), new date(system.currenttimemillis + 3000) );
3、scheduleatfixedrate(runnable task, long period);
立即执行,循环任务,指定一个执行周期(毫秒计时)
ps:不管上一个周期是否执行完,到时间下个周期就开始执行
如下:安排一个任务以固定的毫秒速率运行,下一个 runnabletask 将始终在 2000 毫秒后运行,而不管上次执行的状态如何,它可能仍在运行。
taskscheduler.scheduleatfixedrate( new runnabletask("fixed rate of 2 seconds") , 2000);
4、scheduleatfixedrate(runnable task, date starttime, long period);
指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)
ps:不管上一个周期是否执行完,到时间下个周期就开始执行
如下:使用crontrigger 来根据 cron 表达式调度任务,可以使用提供的触发器按照某个指定的节奏或时间表运行任务,在这种情况下,runnabletask 将在每分钟的第 10 秒执行。
taskscheduler.scheduleatfixedrate(new runnabletask( "fixed rate of 2 seconds"), new date(), 3000);
5、schedulewithfixeddelay(runnable task, long delay);
立即执行,循环任务,指定一个间隔周期(毫秒计时)
ps:上一个周期执行完,等待delay时间,下个周期开始执行
如下:配置一个任务在 1000 毫秒的固定延迟后运行,runnabletask 将始终在一次执行完成和下一次执行开始之间运行 1000 毫秒。
taskscheduler.schedulewithfixeddelay( new runnabletask("fixed 1 second delay"), 1000);
6、schedulewithfixeddelay(runnable task, date starttime, long delay);
指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)
ps:上一个周期执行完,等待delay时间,下个周期开始执行
如下:将任务配置为在给定开始时间的固定延迟后运行,runnabletask 将在指定的执行时间被调用,其中包括 @postconstruct 方法开始的时间,随后延迟 1000 毫秒。
taskscheduler.schedulewithfixeddelay( new runnabletask("current date fixed 1 second delay"), new date(), 1000);
接口5个实现类
1、concurrenttaskscheduler
以当前线程执行任务,如果任务简单,可以直接使用这个类来执行,快捷方便
- 单线程运行
public class loctest implements runnable { private concurrenttaskscheduler concurrenttaskscheduler = new concurrenttaskscheduler(); private void start() { concurrenttaskscheduler.schedule(this, new date()); } public void run() { thread thread = thread.currentthread(); system.out.println("current id:" + thread.getid()); system.out.println("current name:" + thread.getname()); } public static void main(string[] args) { new loctest().start(); } }
2、defaultmanagedtaskscheduler
以当前线程执行任务,是concurrenttaskscheduler的子类,添加了jndi的支持。
和concurrenttaskscheduler一样的用法,需要使用jndi可以单独设置
3、threadpooltaskscheduler
taskscheduler接口的默认实现类,多线程定时任务执行。可以设置执行线程池数(默认一个线程)
- 使用前必须得先调用
initialize()
【初始化方法】 - 有
shutdown()方法
,执行完后可以关闭线程
除实现了taskscheduler接口中的方法外,它还包含了一些对scheduledthreadpoolexecutor进行操作的接口,其常用方法如下:
setpoolsize
:设置线程池大小,最小为1,默认情况下也为1;seterrorhandler
:设置异常处理器。getscheduledthreadpoolexecutor
:获取scheduledexecutor,默认scheduledthreadpoolexecutor类型。getactivecount
:获取当前活动的线程数execute
: 提交执行一次的任务submit
\submitlistenable
:提交执行一次的任务,并且返回一个future对象供判断任务状态使用
public class loctest implements runnable { private threadpooltaskscheduler taskscheduler = new threadpooltaskscheduler(); private void start() { taskscheduler.setpoolsize(10); //必须得先初始化,才能使用 taskscheduler.initialize(); taskscheduler.schedule(this, new date()); } public void run() { thread ct = thread.currentthread(); system.out.println("current id:"+ct.getid()); system.out.println("current name:"+ct.getname()); } public static void main(string[] args) { new loctest().start(); } }
4、timermanagertaskscheduler
用于包装commonj中的timermanager接口。
在使用commonj进行调度时使用
spring boot使用taskscheduler实现动态增删启停定时任务
schedulingconfig:添加执行定时任务的线程池配置类
import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.scheduling.taskscheduler; import org.springframework.scheduling.concurrent.threadpooltaskscheduler; @configuration public class schedulingconfig { @bean public taskscheduler taskscheduler() { threadpooltaskscheduler taskscheduler = new threadpooltaskscheduler(); // 定时任务执行线程池核心线程数 taskscheduler.setpoolsize(4); taskscheduler.setremoveoncancelpolicy(true); taskscheduler.setthreadnameprefix("taskschedulerthreadpool-"); return taskscheduler; } }
scheduledtask:添加scheduledfuture的包装类
scheduledfuture是scheduledexecutorservice定时任务线程池的执行结果。
import java.util.concurrent.scheduledfuture; public final class scheduledtask { volatile scheduledfuture<?> future; /** * 取消定时任务 */ public void cancel() { scheduledfuture<?> future = this.future; if (future != null) { future.cancel(true); } } }
schedulingrunnable:添加runnable接口实现类
添加runnable接口实现类,被定时任务线程池调用,用来执行指定bean里面的方法
import org.apache.commons.lang.stringutils; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.util.reflectionutils; import java.lang.reflect.method; import java.util.objects; public class schedulingrunnable implements runnable { private static final logger logger = loggerfactory.getlogger(schedulingrunnable.class); private final string beanname; private final string methodname; private final string params; public schedulingrunnable(string beanname, string methodname) { this(beanname, methodname, null); } public schedulingrunnable(string beanname, string methodname, string params) { this.beanname = beanname; this.methodname = methodname; this.params = params; } @override public void run() { logger.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanname, methodname, params); long starttime = system.currenttimemillis(); try { object target = springcontextutils.getbean(beanname); method method = null; if (stringutils.isnotempty(params)) { method = target.getclass().getdeclaredmethod(methodname, string.class); } else { method = target.getclass().getdeclaredmethod(methodname); } reflectionutils.makeaccessible(method); if (stringutils.isnotempty(params)) { method.invoke(target, params); } else { method.invoke(target); } } catch (exception ex) { logger.error(string.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", beanname, methodname, params), ex); } long times = system.currenttimemillis() - starttime; logger.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", beanname, methodname, params, times); } @override public boolean equals(object o) { if (this == o) return true; if (o == null || getclass() != o.getclass()) return false; schedulingrunnable that = (schedulingrunnable) o; if (params == null) { return beanname.equals(that.beanname) && methodname.equals(that.methodname) && that.params == null; } return beanname.equals(that.beanname) && methodname.equals(that.methodname) && params.equals(that.params); } @override public int hashcode() { if (params == null) { return objects.hash(beanname, methodname); } return objects.hash(beanname, methodname, params); } }
crontaskregistrar:添加定时任务注册类,用来增加、删除定时任务
import com.example.testspringboot.cron.scheduleresult; import org.springframework.beans.factory.disposablebean; import org.springframework.beans.factory.annotation.autowired; import org.springframework.scheduling.taskscheduler; import org.springframework.scheduling.config.crontask; import org.springframework.stereotype.component; import java.util.*; import java.util.concurrent.concurrenthashmap; /** * 添加定时任务注册类,用来增加、删除定时任务。 */ @component public class crontaskregistrar implements disposablebean { private final map<runnable, scheduledtask> scheduledtasks = new concurrenthashmap<>(16); private final map<integer, scheduleresult> schedulerjob = new hashmap<>(); @autowired private taskscheduler taskscheduler; public taskscheduler getscheduler() { return this.taskscheduler; } public void addcrontask(scheduleresult scheduleresult) { schedulingrunnable task = new schedulingrunnable(scheduleresult.getbeanname(), scheduleresult.getmethodname(), scheduleresult.getmethodparams()); string cronexpression = scheduleresult.getcronexpression(); crontask crontask = new crontask(task, cronexpression); // 如果当前包含这个任务,则移除 if (this.scheduledtasks.containskey(task)) { removecrontask(scheduleresult.getbeanname(), scheduleresult.getmethodname(), scheduleresult.getmethodparams()); } schedulerjob.put(scheduleresult.getjobid(), scheduleresult); this.scheduledtasks.put(task, schedulecrontask(crontask)); } public void removecrontask(string beanname, string methodname, string methodparams) { schedulingrunnable task = new schedulingrunnable(beanname, methodname, methodparams); scheduledtask scheduledtask = this.scheduledtasks.remove(task); if (scheduledtask != null) { scheduledtask.cancel(); } } public void removecrontask(scheduleresult scheduleresult) { schedulerjob.put(scheduleresult.getjobid(), scheduleresult); removecrontask(scheduleresult.getbeanname(), scheduleresult.getmethodname(), scheduleresult.getmethodparams()); } public scheduledtask schedulecrontask(crontask crontask) { scheduledtask scheduledtask = new scheduledtask(); scheduledtask.future = this.taskscheduler.schedule(crontask.getrunnable(), crontask.gettrigger()); return scheduledtask; } public map<runnable, scheduledtask> getscheduledtasks() { return scheduledtasks; } public map<integer, scheduleresult> getschedulerjob() { return schedulerjob; } @override public void destroy() { for (scheduledtask task : this.scheduledtasks.values()) { task.cancel(); } this.scheduledtasks.clear(); } public scheduleresult getschedulerbyjobid(integer jobid) { for (scheduleresult job : findalltask()) { if (jobid.equals(job.getjobid())) { return job; } } return null; } public list<scheduleresult> findalltask() { list<scheduleresult> scheduleresults = new arraylist<>(); set<map.entry<integer, scheduleresult>> entries = schedulerjob.entryset(); for (map.entry<integer, scheduleresult> en : entries) { scheduleresults.add(en.getvalue()); } return scheduleresults; } }
cronutils:校验cron表达式的有效性
import org.springframework.scheduling.support.cronexpression; public class cronutils { /** * 返回一个布尔值代表一个给定的cron表达式的有效性 * * @param cronexpression cron表达式 * @return boolean 表达式是否有效 */ public static boolean isvalid(string cronexpression) { return cronexpression.isvalidexpression(cronexpression); } }
scheduleresult:添加定时任务实体类
import lombok.data; @data public class scheduleresult { /** * 任务id */ private integer jobid; /** * bean名称 */ private string beanname; /** * 方法名称 */ private string methodname; /** * 方法参数: 执行service里面的哪一种方法 */ private string methodparams; /** * cron表达式 */ private string cronexpression; /** * 状态(1正常 0暂停) */ private integer jobstatus; /** * 备注 */ private string remark; /** * 创建时间 */ private string createtime; /** * 更新时间 */ private string updatetime; }
schedulejobstatus:任务状态枚举类型
public enum schedulejobstatus { /** * 暂停 */ pause, /** * 正常 */ normal; }
springcontextutils类:从spring容器里获取bean
import org.springframework.beans.beansexception; import org.springframework.context.applicationcontext; import org.springframework.context.applicationcontextaware; import org.springframework.stereotype.component; @component public class springcontextutils implements applicationcontextaware { private static applicationcontext applicationcontext = null; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { if (springcontextutils.applicationcontext == null) { springcontextutils.applicationcontext = applicationcontext; } } public static applicationcontext getapplicationcontext() { return applicationcontext; } // 通过name获取 bean. public static object getbean(string name) { return getapplicationcontext().getbean(name); } // 通过class获取bean. public static <t> t getbean(class<t> clazz) { return getapplicationcontext().getbean(clazz); } // 通过name,以及clazz返回指定的bean public static <t> t getbean(string name, class<t> clazz) { return getapplicationcontext().getbean(name, clazz); } public static boolean containsbean(string name) { return getapplicationcontext().containsbean(name); } public static boolean issingleton(string name) { return getapplicationcontext().issingleton(name); } public static class<? extends object> gettype(string name) { return getapplicationcontext().gettype(name); } }
schedulejobservice:增删启停service方法
@service @slf4j public class schedulejobservice { @autowired private crontaskregistrar crontaskregistrar; public void addschedulejob(scheduleresult scheduleresult) { long currenttimemillis = system.currenttimemillis(); scheduleresult.setcreatetime(formattimeymd_hms_sss(currenttimemillis)); scheduleresult.setupdatetime(formattimeymd_hms_sss(currenttimemillis)); scheduleresult.setjobid(findalltask().size() + 1); if (scheduleresult.getjobstatus().equals(schedulejobstatus.normal.ordinal())) { log.info("stop or pause: is now on"); crontaskregistrar.addcrontask(scheduleresult); return; } crontaskregistrar.getschedulerjob().put(scheduleresult.getjobid(), scheduleresult); } public void editschedulejob(scheduleresult currentschedule) { //先移除 crontaskregistrar.removecrontask(currentschedule.getbeanname(), currentschedule.getmethodname(), currentschedule.getmethodparams()); scheduleresult pastschedulejob = crontaskregistrar.getschedulerbyjobid(currentschedule.getjobid()); if (pastschedulejob == null) { system.out.println("没有这个任务"); return; } //然后判断是否开启, 如果开启的话,现在立即执行 startorstopschedulerjob(currentschedule, true); } public void deleteschedulejob(scheduleresult scheduleresult) { // 清除这个任务 crontaskregistrar.removecrontask(scheduleresult.getbeanname(), scheduleresult.getmethodname(), scheduleresult.getmethodparams()); // 清除这个任务的数据 crontaskregistrar.getschedulerjob().remove(scheduleresult.getjobid()); } public void startorstopscheduler(scheduleresult scheduleresult) { crontaskregistrar.getschedulerjob().get(scheduleresult.getjobid()).setjobstatus(scheduleresult.getjobstatus()); startorstopschedulerjob(scheduleresult, false); } private void startorstopschedulerjob(scheduleresult scheduleresult, boolean update) { // 更新时间 scheduleresult.setupdatetime(formattimeymd_hms_sss(system.currenttimemillis())); if (scheduleresult.getjobstatus().equals(schedulejobstatus.normal.ordinal())) { system.out.println("停止或暂停:现在是开启"); crontaskregistrar.addcrontask(scheduleresult); return; } system.out.println("停止或暂停:现在是暂停"); if (update){ crontaskregistrar.removecrontask(scheduleresult); return; } crontaskregistrar.removecrontask(scheduleresult.getbeanname(), scheduleresult.getmethodname(), scheduleresult.getmethodparams()); } public list<scheduleresult> findalltask() { return crontaskregistrar.findalltask(); } // 转换为年-月-日 时:分:秒 private string formattimeymd_hms_sss(long time) { return new simpledateformat("yyyy-mm-dd hh:mm:ss:sss").format(time); } }
croncontroller:访问接口
import com.example.testspringboot.cron.scheduleresult; import com.example.testspringboot.cron.schedulejobservice; import com.example.testspringboot.cron.utils.cronutils; import com.google.gson.gson; import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.*; import java.util.list; @restcontroller public class croncontroller { @autowired private schedulejobservice schedulejobservice; /** * 测试上传的用例文件, 获取详细执行结果 */ @postmapping("/add") void executetestonefile(@requestbody scheduleresult scheduleresult) { boolean valid = cronutils.isvalid(scheduleresult.getcronexpression()); if (valid){ system.out.println("校验成功, 添加任务"); scheduleresult.setmethodparams(scheduleresult.getbranch()+scheduleresult.getcasedir()); schedulejobservice.addschedulejob(scheduleresult); }else { system.out.println("校验失败"); } } @postmapping("/stop") void end(@requestbody scheduleresult scheduleresult) { gson gson = new gson(); system.out.println("================"); system.out.println(scheduleresult); system.out.println("================="); scheduleresult.setjobstatus(0); schedulejobservice.startorstopscheduler(scheduleresult); } @postmapping("/start") void start(@requestbody scheduleresult scheduleresult) { system.out.println("================"); system.out.println(scheduleresult); system.out.println("================="); scheduleresult.setjobstatus(1); schedulejobservice.startorstopscheduler(scheduleresult); } @postmapping("/edit") void edit(@requestbody scheduleresult scheduleresult) { system.out.println("=======edit========="); system.out.println(scheduleresult); system.out.println("================="); schedulejobservice.editschedulejob(scheduleresult); } @postmapping("/delete") void delete(@requestbody scheduleresult scheduleresult) { system.out.println("=======delete========="); system.out.println(scheduleresult); system.out.println("================="); schedulejobservice.deleteschedulejob(scheduleresult); } @getmapping("/tasks") list<scheduleresult> get() throws exception { list<scheduleresult> alltask = schedulejobservice.findalltask(); system.out.println("现在的定时任务数量 = " + alltask.size()); system.out.println("现在的定时任务 = " + alltask); return alltask; } }
c1:测试bean
import org.springframework.stereotype.component; @component public class c1 { public void test1(string y){ system.out.println("这个是test1的bean : " + y); } public void test2(){ system.out.println("这个是test1的bean中test2方法"); } }
init:项目启动后的定时任务
import com.example.testspringboot.cron.schedulejobservice; import com.example.testspringboot.cron.scheduleresult; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.commandlinerunner; import org.springframework.stereotype.component; @component public class init implements commandlinerunner { @autowired private schedulejobservice schedulejobservice; @override public void run(string... args) throws exception { system.out.println("开始珍惜"); scheduleresult scheduleresult = new scheduleresult(); scheduleresult.setbeanname("c1"); scheduleresult.setmethodname("test1"); scheduleresult.setcronexpression("0/25 * * * * *"); scheduleresult.setjobstatus(1); scheduleresult.setmethodparams("test1"); schedulejobservice.addschedulejob(scheduleresult); schedulejobservice.findalltask(); } }
后续的操作,基本上就是复制粘贴,运行
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论