在使用springboot框架进行开发时,一般都是通过@scheduled注解进行定时任务的开发:
@component public class testtask { @scheduled(cron="0/5 * * * * ? ") //每5秒执行一次 public void execute(){ simpledateformat df = new simpledateformat("yyyy-mm-dd hh:mm:ss"); log.info("任务执行" + df.format(new date())); } }
但是这种方式存在一个问题,那就是任务的周期控制是死的,必须编写在代码中,如果遇到需要在系统运行过程中想中止、立即执行、修改执行周期等动态操作的需求时,使用注解的方式便不能满足了,当然为了满足此种需求可以额外再引入其他任务调度插件(例如xxl-job等),但是引入其他组件是需要衡量成本的,额外的依赖成本、组件的维护成本、开发的复杂度等等,所以如果系统体量不是那么大,完全没必要通过增加组件来完成,可以基于springboot框架实现一套内置轻量级的任务调度。
设计思路
整体设计
这里我们把定时任务以类作为基础单位,即一个类为一个任务,然后通过配置数据的方式,进行任务的读取,通过反射生成任务对象,使用springboot本身的线程池任务调度,完成动态的定时任务驱动,同时通过接口支撑实现相应的rest api对外暴露接口
任务模型
首先基于模板模式,设计基础的任务执行流程抽象类,定义出一个定时任务需要执行的内容和步骤和一些通用的方法函数,后续具体的定时任务直接继承该父类,实现该父类的before、start、after三个抽象函数即可,所有公共操作均在抽象父类完成
特殊说明:
基于此方法创建的类是不归spring的容器管理的,所以自定义的任务子类中是无法使用springboot中的任何注解,尤其在自定义任务类中如果需要依赖其他bean时,需要借助抽象父类abstractbasecrontask中已经实现的<t> t getserver(class<t> classname)来完成,getserver的实现如下:
public <t> t getserver(class<t> classname){ return applicationcontext.getbean(classname); }
是通过springboot中的applicationcontext接口来获取spring的上下文,以此来满足可以获取spring中其他bean的诉求。
例如,有个定时任务taskone类,它需要使用userservice类中的 caculatemoney()的方法,势必这个定时任务需要依赖userservice类,而taskone并非是spring创建的对象,而是我们人为干预生成的对象,所以它是不在spring的bean管理范围的,自然也就无法使用@autowird等方式注入userservice类,此时就需要使用getserver方法来获取userservice对象
//自定义定时任务类 public class taskone extends abstractbasecrontask { private userservice userservice; public testtask(taskentity taskentity) { super(taskentity); } @override public void beforejob() { //任务运行第一步,先将userservice进行变量注入 userservice = getserver(userservice.class); …… } @override public void startjob() { if(xxxx){ //直接调用getserver获取需要的bean user user = getserver(usermapper.class).finduser("111223") userservice.caluatemoney(user); //……其他代码 } } @override public void afterjob() { } }
任务对象加载过程
核心逻辑在于利用反射,在springboot启动后动态创建相应的定时任务类,并将其放置到springboot的定时线程池中进行维护,同时将该对象同步存放至内存中一份,便于可以实时调用,当进行修改任务相关配置时,需要重新加载一次内容。
public class taskscheduleserverimpl implements taskscheduleserver { //正在运行的任务 private static concurrenthashmap<string, scheduledfuture> runningtasks = new concurrenthashmap<>(); //线程池任务调度 private threadpooltaskscheduler threadpooltaskscheduler = new threadpooltaskscheduler(); public boolean addtasktoscheduling(taskentity task) { if(!runningtasks.containskey(task.gettaskid())){ try{ class<?> clazz = class.forname(task.gettaskclass()); constructor c = clazz.getconstructor(taskentity.class); abstractbasecrontask runnable = (abstractbasecrontask) c.newinstance(task); //反射方式生成对象不属于spring容器管控,对于spring的bean使用需要手动注入 runnable.setapplicationcontext(context); crontrigger cron = new crontrigger(task.gettaskcron()); //put到runtasks runningtasks.put(task.gettaskid(), objects.requirenonnull(this.threadpooltaskscheduler.schedule(runnable, cron))); //存入内存中,便于外部调用 ramtasks.put(task.gettaskid(),runnable); task.settaskramstatus(1); taskinfoopmapper.updatetaskinfo(task); return true; }catch (exception e){ log.error("定时任务加载失败..."+e); } } return false; } }
部分源码
这里将配置内容放入数据库中,直接以数据库中的表作为任务配置的基础
/** * 任务对象 **/ @data public class taskentity implements serializable { //任务唯一id private string taskid; //任务名称 private string taskname; //任务描述 private string taskdesc; //执行周期配置 private string taskcron; //任务类的全路径 private string taskclass; //任务的额外配置 private string taskoutconfig; //任务创建时间 private string taskcreatetime; //任务是否启动,1启用,0不启用 private integer taskisuse; //是否随系统启动立即执行 private integer taskbootup; //任务上次执行状态 private integer tasklastrun; //任务是否加载至内存中 private integer taskramstatus; }
核心逻辑,加载定时任务接口及其实现类
public interface taskscheduleserver { concurrenthashmap<string, abstractbasecrontask> gettaskschedulingram(); /** * 初始化任务调度 */ void initscheduling(); /** * 添加任务至内存及容器 * @param taskentity 任务实体 * @return boolean */ boolean addtasktoscheduling(taskentity taskentity); /** * 从任务调度器中移除任务 * @param id 任务id * @return boolean */ boolean removetaskfromscheduling(string id); /** * 执行指定任务 * @param id 任务id * @return double 耗时 */ double runtaskbyid(string id); /** * 清空任务 */ void claearalltask(); /** * 加载所有任务 */ void loadalltask(); /** * 运行开机自启任务 */ void runbootuptask(); } @slf4j @component public class taskscheduleserverimpl implements taskscheduleserver { ………… @override public double runtaskbyid(string id) { taskentity task = taskinfoopmapper.querytaskinfobyid(id); if(null!=task) { if (runningtasks.containskey(task.gettaskid())){ ramtasks.get(task.gettaskid()).run(); return ramtasks.get(task.gettaskid()).getruntime(); } } return 0d; } @override public void claearalltask() { ramtasks.clear(); log.info("【定时任务控制器】清除内存任务 完成"); runningtasks.clear(); log.info("【定时任务控制器】清除线程任务 完成"); threadpooltaskscheduler.shutdown(); } @override public void loadalltask() { list<taskentity> alltask = taskinfoopmapper.querytaskinfo(null); for (taskentity task : alltask) { if(addtasktoscheduling(task)){ log.info("【定时任务初始化】装填任务:{} [ 任务执行周期:{} ] [ bootup:{}]",task.gettaskname(),task.gettaskcron(),task.gettaskbootup()); } } } @override public void runbootuptask() { taskentity entity = new taskentity().taskbootup(1); list<taskentity> list = taskinfoopmapper.querytaskinfo(entity); for(taskentity task:list){ runtaskbyid(task.gettaskid()); } } }
在springboot中的加载类
@order(3) @component @slf4j public class afterappstarted implements applicationrunner { taskscheduleserver taskscheduleserver; @autowired public void settaskscheduleserver(taskscheduleserver taskscheduleserver) { this.taskscheduleserver = taskscheduleserver; } @override public void run(applicationarguments args) throws exception { //运行随系统启动的定时任务 taskscheduleserver.runbootuptask(); } }
对外暴露控制接口及其service
@restcontroller @requestmapping("/taskscheduling/manage") @api(tags = "数据源管理服务") public class taskschedulingcontroller { taskschedulemanagerservice taskschedulemanagerservice; @autowired public void settaskschedulemanagerservice(taskschedulemanagerservice taskschedulemanagerservice) { this.taskschedulemanagerservice = taskschedulemanagerservice; } @postmapping("/search") @operation(summary = "分页查询任务") public response searchdata(@requestbody searchtaskdto param){ return response.success(taskschedulemanagerservice.searchtaskforpage(param)); } @getmapping("/detail") @operation(summary = "具体任务对象") public response searchdetail(string taskid){ return response.success(taskschedulemanagerservice.searchtaskdetail(taskid)); } @getmapping("/shutdown") @operation(summary = "关闭指定任务") public response shutdowntask(string taskid){ return response.success(taskschedulemanagerservice.shutdowntask(taskid)); } @getmapping("/open") @operation(summary = "开启指定任务") public response opentask(string taskid){ return response.success(taskschedulemanagerservice.opentask(taskid)); } @getmapping("/run") @operation(summary = "运行指定任务") public response runtask(string taskid){ return response.success(taskschedulemanagerservice.runtask(taskid)); } @postmapping("/update") @operation(summary = "更新指定任务") public response updatetask(@requestbody taskentity taskentity){ return response.success(taskschedulemanagerservice.updatetaskbusinessinfo(taskentity)); } }
相关接口实现类
@service public class taskschedulemanagerserviceimpl implements taskschedulemanagerservice { private taskinfoopmapper taskinfoopmapper; private taskscheduleserver taskscheduleserver; @autowired public void settaskinfoopmapper(taskinfoopmapper taskinfoopmapper) { this.taskinfoopmapper = taskinfoopmapper; } @autowired public void settaskscheduleserver(taskscheduleserver taskscheduleserver) { this.taskscheduleserver = taskscheduleserver; } @override public ipage<taskentity> searchtaskforpage(searchtaskdto dto) { page<taskentity> pageparam = new page<>(1,10); pageparam.setasc("task_id"); return taskinfoopmapper.querytaskinfopage(pageparam,dto.getfilterkey(),dto.getbootup(),dto.getlastrunstatus()); } @override public taskentity searchtaskdetail(string taskid) { if(!stringutils.isempty(taskid)){ return taskinfoopmapper.querytaskinfobyid(taskid); } return null; } @override public taskrunretdto runtask(string taskid) { abstractbasecrontask task = taskscheduleserver.gettaskschedulingram().get(taskid); taskrunretdto result = new taskrunretdto(taskrunretdto.taskoperation.run, 0); if(null != task) { double time = taskscheduleserver.runtaskbyid(taskid); result.setresult(1); return result.extend(time).taskinfo(task.getthistaskinfo()); } else { return result.extend("任务未启用"); } } @override public taskrunretdto shutdowntask(string taskid) { abstractbasecrontask task = taskscheduleserver.gettaskschedulingram().get(taskid); taskrunretdto result = new taskrunretdto(taskrunretdto.taskoperation.shutdown, 0); if(null != task) { boolean flag = taskscheduleserver.removetaskfromscheduling(taskid); if(flag) { result.setresult(1); } return result.extend("任务成功关闭").taskinfo(task.getthistaskinfo()); } else { return result.extend("任务未启用"); } } @override public taskrunretdto opentask(string taskid) { taskentity task = taskinfoopmapper.querytaskinfobyid(taskid); taskrunretdto result = new taskrunretdto(taskrunretdto.taskoperation.open, 0); if(null != task) { if (!taskscheduleserver.gettaskschedulingram().containskey(taskid)) { boolean flag = taskscheduleserver.addtasktoscheduling(task); if(flag) { result.setresult(1); } return result.extend("任务开启成功").taskinfo(task); } else { return result.extend("任务处于启动状态").taskinfo(task); } }else { return result.extend("任务不存在!"); } } @override public taskrunretdto updatetaskbusinessinfo(taskentity entity) { taskentity task = searchtaskdetail(entity.gettaskid()); taskrunretdto result = new taskrunretdto(taskrunretdto.taskoperation.update, 0).taskinfo(entity); string config = entity.gettaskoutconfig(); if(null != config && !jsonutil.isjson(config) && !jsonutil.isjsonarray(config)){ result.setresult(0); result.extend("更新任务失败,任务配置必须为json或空"); result.taskinfo(entity); return result; } task.settaskcron(entity.gettaskcron()); task.settaskoutconfig(entity.gettaskoutconfig()); task.settaskname(entity.gettaskname()); task.settaskdesc(entity.gettaskdesc()); int num = taskinfoopmapper.updatetaskinfo(task); if (num == 1) { result.setresult(1); result.extend("成功更新任务"); result.taskinfo(entity); //重新刷新任务 taskscheduleserver.removetaskfromscheduling(entity.gettaskid()); taskscheduleserver.addtasktoscheduling(task); } return result; }
效果
数据库中配置任务
任务代码
public class testtask extends abstractbasecrontask { public testtask(taskentity taskentity) { super(taskentity); } @override public void beforejob() { log.info("测试任务开始"); } @override public void startjob() { } @override public void afterjob() { } }
任务查看
执行效果
到此这篇关于基于springboot实现轻量级的动态定时任务调度的文章就介绍到这了,更多相关springboot动态定时任务调度内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论