1.什么是schedulemaster
schedulemaster是分布式任务调度
系统,是国内的一位开发者写的。简称:集中任务调度系统,最简单的理解schedulemaster,就是对不同的系统里面的调度任务做统一管理的框架。
例如我们现在有多个系统,每个系统针对自己处理不同的业务场景。衍生出自己的调度任务,想象一下,如果每个系统人为去维护,那随着调度任务越来越多,人是崩溃的吧,可见维护和技术成本是巨大的,这时我们需要选择分布式任务系统框架做统一的管理
当然有目前有很多相对优秀分布式任务系统框架,我们主要学习 schedulemaster
2.使用schedulemaster
1.首先我们需要使用net core web api创建几个模拟的微服务,分别为 考勤、算薪、邮件、短信
2.下载开源的schedulemaster,并且使用schedulemaster调度我们的微服务接口
- sqlserver:"persist security info = false; user id =sa; password =123456; initial catalog =schedule_master; server =."
- postgresql:"server=localhost;port=5432;database=schedule_master;user id=postgres;password=123456;pooling=true;maxpoolsize=20;"
- mysql:"data source=localhost;database=schedule_master;user id=root;password=123456;pooling=true;charset=utf8mb4;port=3306;sslmode=none;treattinyasboolean=true"
修改host的配置文件和支持的数据库,框架默认使用mysql
修改web的配置文件和支持的数据库,框架默认使用mysql
3.进入hos.schedulemaster.web项目的发布目录,dotnet hos.schedulemaster.web.dll
启动项目 ,此时会生成数据库
登录账号:admin
密码:111111
4.进入hos.schedulemaster.quartzhost项目的发布目录,执行命令,启动项目
dotnet hos.schedulemaster.quartzhost.dll --urls http://*:30003
1.配置http调度任务
5.准备就绪后,使用后台查看节点管理,可以看到web主节点30000和任务调度的接口30002已经在运行
6.使用任务列表菜单,创建定时调度任务,配置基础信息和元数据配置,然后点击保存就开始执行任务
2.配置程序集调度任务
1.创建一个类库,安装schedulemaster库, 创建一个类继承taskbase,实现抽象方法,然后编译程序集
namespace taskexcuteservice { class assemblytask : taskbase { public override void run(taskcontext context) { context.writelog("程序集任务"); } } }
2.找到debug目录,去掉base程序集,然后添加为压缩文件
3.在web界面找到任务配置,选择程序集进行基本配置,配置元数据,输入程序集名称,然后输入类,并将程序集上传,保存就可以运行了
3.使用api接入任务
为了方便业务系统更好的接入调度系统,schedulemaster创建任务不仅可以在控制台中实现,系统也提供了webapi供业务系统使用代码接入,这种方式对延时任务来说尤其重要。
1.api server 对接流程
- 在控制台中创建好专用的api对接用户账号。
- 使用对接账号的用户名设置为http header中的
ms_auth_user
值。 - 使用经过哈希运算过的秘钥设置为http header中的
ms_auth_secret值
,计算规则:按{用户名}{hash(密码)}{用户名}的格式拼接得到字符串str,然后再对str做一次hash运算即得到最终秘钥,hash函数是小写的32位md5算法。 - 使用form格式发起http调用,如果非法用户会返回401-unauthorized。
httpclient client = new httpclient(); client.defaultrequestheaders.add("ms_auth_user", "admin"); client.defaultrequestheaders.add("ms_auth_secret", securityhelper.md5($"admin{securityhelper.md5("111111")}}admin"));
所有接口采用统一的返回格式,字段如下:
参数名称 | 参数类型 | 说明 |
---|---|---|
success | bool | 是否成功 |
status | int | 结果状态,0-请求失败 1-请求成功 2-登录失败 3-参数异常 4-数据异常 |
message | string | 返回的消息 |
data | object | 返回的数据 |
2.创建程序集任务
使用api创建任务的方式不支持上传程序包,所以在任务需要启动时要确保程序包已通过其他方式上传,否则会启动失败。
- 接口地址:
http://yourip:30000/api/task/create
- 请求类型:
post
- 参数格式:
application/x-www-form-urlencoded
- 返回结果:创建成功返回任务id
- 参数列表:
参数名称 | 参数类型 | 是否必填 | 说明 |
---|---|---|---|
metatype | int | 是 | 任务类型,这里固定是1 |
title | string | 是 | 任务名称 |
runloop | bool | 是 | 是否按周期执行 |
cronexpression | string | 否 | cron表达式,如果runloop为true则必填 |
assemblyname | string | 是 | 程序集名称 |
classname | string | 是 | 执行类名称,包含完整命名空间 |
startdate | datetime | 是 | 任务开始时间 |
enddate | datetime | 否 | 任务停止时间,为空表示不限停止时间 |
remark | string | 否 | 任务描述说明 |
keepers | list<int> | 否 | 监护人id |
nexts | list<guid> | 否 | 子级任务id |
executors | list<string> | 否 | 执行节点名称 |
runnow | bool | 否 | 创建成功是否立即启动 |
params | list<scheduleparam> | 否 | 自定义参数列表,也可以通过customparamsjson字段直接传json格式字符串 |
scheduleparam:
参数名称 | 参数类型 | 是否必填 | 说明 |
---|---|---|---|
paramkey | string | 是 | 参数名称 |
paramvalue | string | 是 | 参数值 |
paramremark | string | 否 | 参数说明 |
httpclient client = new httpclient(); list<keyvaluepair<string, string>> args = new list<keyvaluepair<string, string>>(); args.add(new keyvaluepair<string, string>("metatype", "1")); args.add(new keyvaluepair<string, string>("runloop", "true")); args.add(new keyvaluepair<string, string>("cronexpression", "33 0/8 * * * ?")); args.add(new keyvaluepair<string, string>("remark", "by xunit tester created")); args.add(new keyvaluepair<string, string>("startdate", datetime.today.tostring("yyyy-mm-dd hh:mm:ss"))); args.add(new keyvaluepair<string, string>("title", "程序集接口测试任务")); args.add(new keyvaluepair<string, string>("assemblyname", "hos.schedulemaster.demo")); args.add(new keyvaluepair<string, string>("classname", "hos.schedulemaster.demo.simple")); args.add(new keyvaluepair<string, string>("customparamsjson", "[{\"paramkey\":\"k1\",\"paramvalue\":\"1111\",\"paramremark\":\"r1\"},{\"paramkey\":\"k2\",\"paramvalue\":\"2222\",\"paramremark\":\"r2\"}]")); args.add(new keyvaluepair<string, string>("keepers", "1")); args.add(new keyvaluepair<string, string>("keepers", "2")); //args.add(new keyvaluepair<string, string>("nexts", "")); //args.add(new keyvaluepair<string, string>("executors", "")); httpcontent reqcontent = new formurlencodedcontent(args); var response = await client.postasync("http://localhost:30000/api/task/create", reqcontent); var content = await response.content.readasstringasync(); debug.writeline(content);
3.创建http任务
- 接口地址:
http://yourip:30000/api/task/create
- 请求类型:
post
- 参数格式:
application/x-www-form-urlencoded
- 返回结果:创建成功返回任务id
- 参数列表:
参数名称 | 参数类型 | 是否必填 | 说明 |
---|---|---|---|
metatype | int | 是 | 任务类型,这里固定是2 |
title | string | 是 | 任务名称 |
runloop | bool | 是 | 是否按周期执行 |
cronexpression | string | 否 | cron表达式,如果runloop为true则必填 |
startdate | datetime | 是 | 任务开始时间 |
enddate | datetime | 否 | 任务停止时间,为空表示不限停止时间 |
remark | string | 否 | 任务描述说明 |
httprequesturl | string | 是 | 请求地址 |
httpmethod | string | 是 | 请求方式,仅支持get\post\put\delete |
httpcontenttype | string | 是 | 参数格式,仅支持application/json和application/x-www-form-urlencoded |
httpheaders | string | 否 | 自定义请求头,scheduleparam列表的json字符串 |
httpbody | string | 是 | 如果是json格式参数,则是对应参数的json字符串;如果是form格式参数,则是对应scheduleparam列表的json字符串。 |
keepers | list<int> | 否 | 监护人id |
nexts | list<guid> | 否 | 子级任务id |
executors | list<string> | 否 | 执行节点名称 |
runnow | bool | 否 | 创建成功是否立即启动 |
httpclient client = new httpclient(); list<keyvaluepair<string, string>> args = new list<keyvaluepair<string, string>>(); args.add(new keyvaluepair<string, string>("metatype", "2")); args.add(new keyvaluepair<string, string>("runloop", "true")); args.add(new keyvaluepair<string, string>("cronexpression", "22 0/8 * * * ?")); args.add(new keyvaluepair<string, string>("remark", "by xunit tester created")); args.add(new keyvaluepair<string, string>("startdate", datetime.today.tostring("yyyy-mm-dd hh:mm:ss"))); args.add(new keyvaluepair<string, string>("title", "http接口测试任务")); args.add(new keyvaluepair<string, string>("httprequesturl", "http://localhost:56655/api/1.0/value/jsonpost")); args.add(new keyvaluepair<string, string>("httpmethod", "post")); args.add(new keyvaluepair<string, string>("httpcontenttype", "application/json")); args.add(new keyvaluepair<string, string>("httpheaders", "[]")); args.add(new keyvaluepair<string, string>("httpbody", "{ \"posts\": [{ \"postid\": 666, \"title\": \"tester\", \"content\":\"testtesttest\" }], \"blogid\": 111, \"url\":\"qweqrrttryrtyrtrtrt\" }")); httpcontent reqcontent = new formurlencodedcontent(args); var response = await client.postasync("http://localhost:30000/api/task/create", reqcontent); var content = await response.content.readasstringasync(); debug.writeline(content);
4.创建延时任务
- 接口地址:
http://yourip:30000/api/delaytask/create
- 请求类型:
post
- 参数格式:
application/x-www-form-urlencoded
- 返回结果:创建成功返回任务id
- 参数列表:
参数名称 | 参数类型 | 是否必填 | 说明 |
---|---|---|---|
sourceapp | string | 是 | 来源 |
topic | string | 是 | 主题 |
contentkey | string | 是 | 业务关键字 |
delaytimespan | int | 是 | 延迟相对时间 |
delayabsolutetime | datetime | 是 | 延迟绝对时间 |
notifyurl | string | 是 | 回调地址 |
notifydatatype | string | 是 | 回调参数格式,仅支持application/json和application/x-www-form-urlencoded |
notifybody | string | 是 | 回调参数,json格式字符串 |
for (int i = 0; i < 5; i++) { int rndnum = new random().next(20, 500); list<keyvaluepair<string, string>> args = new list<keyvaluepair<string, string>>(); args.add(new keyvaluepair<string, string>("sourceapp", "testapp")); args.add(new keyvaluepair<string, string>("topic", "testapp.trade.timeoutcancel")); args.add(new keyvaluepair<string, string>("contentkey", i.tostring())); args.add(new keyvaluepair<string, string>("delaytimespan", rndnum.tostring())); args.add(new keyvaluepair<string, string>("delayabsolutetime", datetime.now.addseconds(rndnum).tostring("yyyy-mm-dd hh:mm:ss"))); args.add(new keyvaluepair<string, string>("notifyurl", "http://localhost:56655/api/1.0/value/delaypost")); args.add(new keyvaluepair<string, string>("notifydatatype", "application/json")); args.add(new keyvaluepair<string, string>("notifybody", "{ \"posts\": [{ \"postid\": 666, \"title\": \"tester\", \"content\":\"testtesttest\" }], \"blogid\": 111, \"url\":\"qweqrrttryrtyrtrtrt\" }")); httpcontent reqcontent = new formurlencodedcontent(args); var response = await client.postasync("http://localhost:30000/api/delaytask/create", reqcontent); var content = await response.content.readasstringasync(); debug.writeline(content); }
4.框架简单分析
1.全局设计
根据官网的设计图,以及操作的流程,简单总结一下任务全局流程为客户端—–>master——>work—–>执行任务。从大的架构方向的思想就是,将原有单个服务中业务和任务调度混合的方式做一些改变,使业务和调度分离,专门把任务调度部分剥离出来,作为一个独立的进程,来统一调用和管理任务,而原有服务只做业务处理。
2.master和work分析
我们主要从主节点,从节点,数据表这几个方面来简单分析,整个业务的时序流程,从本质来看并不复杂master和work之间的关系是相互配合的,可能乍一看下面整个图有点混乱,下面来解释一下,其实master节点将任务添加到数据中,然后work节点,去从对应的数据表中取出任务数据,然后根据任务对应的配置,生成配置信息,调用quartz执行任务,这就是我们整个框架中最核心的业务。
当然master和work除了主要承载了整个管理系统的ui可视化、后台业务操作、任务执行之外,如果从细节以及实现方式来说主要做了以下事情:
master
- 1.分配任务执行和选择节点
- 2.对work节点进行健康检查,对任务进行故障转移
work
- 1.取出任务配置信息
- 2.使用quartz根据配置运行任务
- 3.使用反射调用程序集
- 4.使用httpclient调用http 接口
到此这篇关于分布式任务调度schedulemaster的文章就介绍到这了,更多相关分布式任务调度schedulemaster内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论