一、分布式锁的底层实现细节(以 redis 为例)
分布式锁是解决任务重复执行的核心,需保证原子性、超时释放和可重入性。以下是生产级 redis 锁实现:
public class redisdistributedlock { private final redistemplate<string, string> redistemplate; private final string lockkey; private final string lockvalue; // 用于标识锁持有者(支持可重入) private final long expiremillis; // 锁过期时间(避免死锁) // 构造函数:初始化锁参数 public redisdistributedlock(redistemplate<string, string> redistemplate, string lockkey, string requestid, long expiremillis) { this.redistemplate = redistemplate; this.lockkey = lockkey; this.lockvalue = requestid; // 建议使用uuid+线程id this.expiremillis = expiremillis; } // 尝试获取锁(原子操作) public boolean trylock() { // 使用redis的set命令实现:nx(不存在则设置)+ px(毫秒过期) return redistemplate.opsforvalue().setifabsent(lockkey, lockvalue, expiremillis, timeunit.milliseconds); } // 释放锁(需校验持有者,避免误释放) public boolean unlock() { // 使用lua脚本保证删除操作的原子性 string script = "if redis.call('get', keys[1]) == argv[1] then return redis.call('del', keys[1]) else return 0 end"; long result = (long) redistemplate.execute( new defaultredisscript<>(script, long.class), collections.singletonlist(lockkey), lockvalue ); return result != null && result > 0; } // 带超时等待的获取锁(轮询重试) public boolean trylock(long waittime, timeunit unit) throws interruptedexception { long timeout = unit.tomillis(waittime); long start = system.currenttimemillis(); while (true) { if (trylock()) { return true; } // 等待重试(避免自旋过于频繁) long remaining = timeout - (system.currenttimemillis() - start); if (remaining <= 0) { return false; // 超时未获取到锁 } thread.sleep(math.min(remaining, 100)); // 最多等待100ms重试 } } }
关键设计点:
- 锁标识(lockvalue):用 uuid + 线程 id 区分持有者,避免释放其他节点的锁。
- 过期时间:需大于任务执行时间(如任务耗时 5s,锁过期设 10s),防止节点宕机导致锁永久持有。
- 续约机制:若任务执行时间可能超过锁过期时间,需启动后台线程定期续约(如每 3s 续期 10s)。
二、任务调度核心原理(以 xxl-job 为例)
1. 调度中心与执行器通信流程
- 执行器注册:执行器启动时通过 http 请求向调度中心注册(携带 appname、ip、端口)。
- 任务触发:调度中心根据 cron 表达式计算下次执行时间,到达时间后通过线程池触发任务,向执行器发送 http 请求(post 方式)。
- 执行反馈:执行器执行完任务后,将结果(成功 / 失败、日志)同步回调度中心。
2. 路由策略与负载均衡
xxl-job 支持多种路由策略,解决任务在集群节点的分配问题:
- 第一个节点:固定选择集群中第一个在线节点(适合单节点执行的任务)。
- 轮询:按顺序依次分配给在线节点(均衡负载)。
- 分片广播:所有在线节点同时执行,每个节点处理不同分片(适合大规模任务)。
分片示例:100 万条数据需批量处理,分为 5 个分片,集群 3 个节点:
@xxljob("shardingtask") public returnt<string> shardinghandler(string param) { // 获取分片参数(由调度中心分配) shardingutil.shardingvo shardingvo = shardingutil.getshardingvo(); int shardindex = shardingvo.getindex(); // 当前分片索引(0-4) int shardtotal = shardingvo.gettotal(); // 总分片数(5) // 按分片处理数据(如按id取模:id % shardtotal == shardindex) list<data> datalist = dataservice.querybysharding(shardindex, shardtotal); for (data data : datalist) { processdata(data); } return returnt.success; }
三、高可用设计(避免单点故障)
1. 调度中心集群化
- 部署方式:多实例部署(如 2 个节点),通过 nginx 负载均衡对外提供服务。
- 数据一致性:依赖 mysql 主从同步(调度中心数据存储在 mysql),确保多实例数据一致。
2. 执行器故障转移
- 心跳检测:执行器定期向调度中心发送心跳(默认 30s 一次),超过 90s 未心跳则标记为离线。
- 任务转移:若执行器离线,调度中心会将其负责的任务分配给其他在线节点(需任务支持重执行)。
四、监控与告警体系
1. 核心监控指标
- 任务维度:执行次数、成功率、平均耗时、最大耗时。
- 节点维度:cpu 使用率、内存占用、任务并发数。
2. 集成 prometheus 监控
// 自定义任务执行指标(使用micrometer) @component public class taskmetrics { private final meterregistry meterregistry; public taskmetrics(meterregistry meterregistry) { this.meterregistry = meterregistry; } // 记录任务执行耗时 public void recordtaskduration(string taskname, long durationms) { timer.builder("task.execution.duration") .tag("task", taskname) .register(meterregistry) .record(durationms, timeunit.milliseconds); } // 记录任务失败次数 public void incrementfailcount(string taskname) { counter.builder("task.execution.fail") .tag("task", taskname) .register(meterregistry) .increment(); } }
在任务执行中埋点:
@xxljob("ordertimeouttask") public returnt<string> ordertimeouthandler(string param) { long start = system.currenttimemillis(); try { // 任务逻辑... metrics.recordtaskduration("ordertimeouttask", system.currenttimemillis() - start); return returnt.success; } catch (exception e) { metrics.incrementfailcount("ordertimeouttask"); return returnt.fail; } }
3. 告警配置
通过 grafana 设置告警规则(如任务失败率 > 5% 时触发告警),并集成钉钉 / 企业微信机器人:
// 钉钉告警示例 public class dingtalkalarm { private final string webhook; public void sendalarm(string message) { httpheaders headers = new httpheaders(); headers.setcontenttype(mediatype.application_json); map<string, object> body = new hashmap<>(); body.put("msgtype", "text"); body.put("text", map.of("content", "定时任务告警:" + message)); new resttemplate().postforobject(webhook, new httpentity<>(body, headers), string.class); } }
五、自定义轻量级方案(无框架依赖)
若场景简单(如无动态配置需求),可基于 redis + 线程池实现极简方案:
@component public class redisscheduledtask { @autowired private redistemplate<string, string> redistemplate; @autowired private taskservice taskservice; // 初始化定时任务(每分钟执行一次) @postconstruct public void init() { scheduledexecutorservice executor = executors.newsinglethreadscheduledexecutor(); executor.scheduleatfixedrate(this::executetask, 0, 1, timeunit.minutes); } // 执行任务(加分布式锁) private void executetask() { string lockkey = "task:order:timeout"; string requestid = uuid.randomuuid().tostring(); redisdistributedlock lock = new redisdistributedlock(redistemplate, lockkey, requestid, 60000); try { if (lock.trylock()) { // 执行核心逻辑 taskservice.processtimeoutorders(); } else { log.info("任务被其他节点执行,当前节点跳过"); } } finally { lock.unlock(); // 释放锁 } } }
六、避坑指南
- 锁过期时间设置:需大于任务最大执行时间(可通过压测评估),避免任务未执行完锁已释放。
- 任务幂等性:即使加了锁,仍需保证任务可重复执行(如使用
update orders set status=1 where id=? and status=0
)。 - 线程池隔离:核心任务与非核心任务使用独立线程池(如
executors.newscheduledthreadpool(5)
),避免相互阻塞。 - 日志追踪:任务执行日志需包含唯一 id(如订单号),便于问题排查。
通过以上细节设计,可构建既高效又可靠的分布式定时任务系统,兼顾性能、可用性和可运维性。实际项目中,建议优先选用 xxl-job 等成熟框架,减少重复开发;特殊场景下再考虑自定义方案。
到此这篇关于java分布式定时任务实现细节的文章就介绍到这了,更多相关java分布式定时任务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论