1. 基础设施层
分布式锁服务
// idistributedlockservice.cs public interface idistributedlockservice { valuetask<iasyncdisposable?> acquirelockasync(string resourcekey, timespan expirytime); } // redisdistributedlockservice.cs public class redisdistributedlockservice : idistributedlockservice { private readonly iconnectionmultiplexer _redis; private readonly ilogger<redisdistributedlockservice> _logger; public redisdistributedlockservice( iconnectionmultiplexer redis, ilogger<redisdistributedlockservice> logger) { _redis = redis; _logger = logger; } public async valuetask<iasyncdisposable?> acquirelockasync(string resourcekey, timespan expirytime) { var db = _redis.getdatabase(); var locktoken = guid.newguid().tostring(); var lockkey = $"distributed-lock:{resourcekey}"; try { var acquired = await db.locktakeasync(lockkey, locktoken, expirytime); if (acquired) { _logger.logdebug("成功获取分布式锁 {lockkey}", lockkey); return new redislockhandle(db, lockkey, locktoken, _logger); } _logger.logdebug("无法获取分布式锁 {lockkey}", lockkey); return null; } catch (exception ex) { _logger.logerror(ex, "获取分布式锁 {lockkey} 时发生错误", lockkey); throw; } } private sealed class redislockhandle : iasyncdisposable { private readonly idatabase _db; private readonly string _lockkey; private readonly string _locktoken; private readonly ilogger _logger; private bool _isdisposed; public redislockhandle( idatabase db, string lockkey, string locktoken, ilogger logger) { _db = db; _lockkey = lockkey; _locktoken = locktoken; _logger = logger; } public async valuetask disposeasync() { if (_isdisposed) return; try { var released = await _db.lockreleaseasync(_lockkey, _locktoken); if (!released) { _logger.logwarning("释放分布式锁 {lockkey} 失败", _lockkey); } else { _logger.logdebug("成功释放分布式锁 {lockkey}", _lockkey); } } catch (exception ex) { _logger.logerror(ex, "释放分布式锁 {lockkey} 时发生错误", _lockkey); } finally { _isdisposed = true; } } } }
2. 任务服务层
定时任务服务
// ipollingservice.cs public interface ipollingservice { task executepollingtasksasync(); task executedailytaskasync(int hour); } // pollingservice.cs public class pollingservice : ipollingservice { private readonly idistributedlockservice _lockservice; private readonly ilogger<pollingservice> _logger; public pollingservice( idistributedlockservice lockservice, ilogger<pollingservice> logger) { _lockservice = lockservice; _logger = logger; } [disableconcurrentexecution(timeoutinseconds: 60 * 30)] // 30分钟防并发 public async task executepollingtasksasync() { await using var lockhandle = await _lockservice.acquirelockasync( "polling-tasks-lock", timespan.fromminutes(25)); // 锁有效期25分钟 if (lockhandle is null) { _logger.loginformation("其他节点正在执行轮询任务,跳过本次执行"); return; } try { _logger.loginformation("开始执行轮询任务 - 节点: {nodeid}", environment.machinename); // 执行所有轮询任务 await task.whenall( pollingtaskasync(), pollingexpiretaskasync(), pollingexpiredelcharacttaskasync() ); // 触发后台任务 _ = backgroundtask.run(() => pollingdelcharacttaskasync(), _logger); _ = backgroundtask.run(() => autocheckapiasync(), _logger); _ = backgroundtask.run(() => dellogsasync(), _logger); } catch (exception ex) { _logger.logerror(ex, "执行轮询任务时发生错误"); throw; } } [disableconcurrentexecution(timeoutinseconds: 60 * 60)] // 1小时防并发 public async task executedailytaskasync(int hour) { var lockkey = $"daily-task-{hour}:{datetime.utcnow:yyyymmdd}"; await using var lockhandle = await _lockservice.acquirelockasync( lockkey, timespan.fromminutes(55)); // 锁有效期55分钟 if (lockhandle is null) { _logger.loginformation("其他节点已执行今日 {hour} 点任务", hour); return; } try { _logger.loginformation("开始执行 {hour} 点任务 - 节点: {nodeid}", hour, environment.machinename); if (hour == 21) { await executenightlymaintenanceasync(); } else if (hour == 4) { await executeearlymorningtasksasync(); } } catch (exception ex) { _logger.logerror(ex, "执行 {hour} 点任务时发生错误", hour); throw; } } // 具体任务实现方法 private async task pollingtaskasync() { // 实现游戏角色启动/关闭逻辑 } private async task executenightlymaintenanceasync() { // 21点特殊任务逻辑 } // 其他方法... } // backgroundtask.cs (安全运行后台任务) public static class backgroundtask { public static task run(func<task> task, ilogger logger) { return task.run(async () => { try { await task(); } catch (exception ex) { logger.logerror(ex, "后台任务执行失败"); } }); } }
3. 任务调度配置层
任务初始化器
// recurringjobinitializer.cs public class recurringjobinitializer : ihostedservice { private readonly irecurringjobmanager _jobmanager; private readonly iserviceprovider _services; private readonly ilogger<recurringjobinitializer> _logger; public recurringjobinitializer( irecurringjobmanager jobmanager, iserviceprovider services, ilogger<recurringjobinitializer> logger) { _jobmanager = jobmanager; _services = services; _logger = logger; } public task startasync(cancellationtoken cancellationtoken) { try { using var scope = _services.createscope(); var pollingservice = scope.serviceprovider.getrequiredservice<ipollingservice>(); // 每30分钟执行的任务 _jobmanager.addorupdate<ipollingservice>( "polling-tasks-30min", s => s.executepollingtasksasync(), "*/30 * * * *"); // 每天21:00执行的任务 _jobmanager.addorupdate<ipollingservice>( "daily-task-21:00", s => s.executedailytaskasync(21), "0 21 * * *"); // 每天04:00执行的任务 _jobmanager.addorupdate<ipollingservice>( "daily-task-04:00", s => s.executedailytaskasync(4), "0 4 * * *"); _logger.loginformation("周期性任务初始化完成"); } catch (exception ex) { _logger.logerror(ex, "初始化周期性任务失败"); throw; } return task.completedtask; } public task stopasync(cancellationtoken cancellationtoken) => task.completedtask; }
4. 应用启动配置
program.cs
var builder = webapplication.createbuilder(args); // 添加redis builder.services.addsingleton<iconnectionmultiplexer>(sp => connectionmultiplexer.connect(builder.configuration.getconnectionstring("redis"))); // 配置hangfire builder.services.addhangfire(config => { config.useredisstorage( builder.configuration.getconnectionstring("redis"), new redisstorageoptions { prefix = "hangfire:", db = 1 // 使用单独的redis数据库 }); config.usecolouredconsolelogprovider(); }); builder.services.addhangfireserver(options => { options.servername = $"{environment.machinename}:{guid.newguid():n}"; options.workercount = 1; options.queues = new[] { "default", "critical" }; }); // 注册服务 builder.services.addsingleton<idistributedlockservice, redisdistributedlockservice>(); builder.services.addscoped<ipollingservice, pollingservice>(); builder.services.addhostedservice<recurringjobinitializer>(); var app = builder.build(); // 配置hangfire仪表盘 app.usehangfiredashboard("/jobs", new dashboardoptions { dashboardtitle = "任务调度中心", authorization = new[] { new hangfiredashboardauthorizationfilter() }, statspollinginterval = 60_000 // 60秒刷新一次 }); app.run(); // hangfire仪表盘授权过滤器 public class hangfiredashboardauthorizationfilter : idashboardauthorizationfilter { public bool authorize(dashboardcontext context) { var httpcontext = context.gethttpcontext(); return httpcontext.user.identity?.isauthenticated == true; } }
5. appsettings.json 配置
{ "connectionstrings": { "redis": "localhost:6379,allowadmin=true", "hangfire": "server=(localdb)\\mssqllocaldb;database=hangfire;trusted_connection=true;" }, "hangfire": { "workercount": 1, "schedulepollinginterval": 5000 } }
关键设计说明
1.分布式锁:
- 使用redis redlock算法实现
- 自动处理锁的获取和释放
- 包含完善的错误处理和日志记录
2.任务隔离:
- 使用hangfire的[disableconcurrentexecution]防止同一任务重复执行
- 分布式锁确保跨节点唯一执行
3.错误处理:
- 所有关键操作都有try-catch和日志记录
- 后台任务使用安全包装器执行
4.可观测性:
- 详细的日志记录
- hangfire仪表盘监控
5.扩展性:
- 可以轻松添加新任务
- 支持动态调整调度策略
这个实现方案完全符合.net 6的最佳实践,支持分布式部署,确保任务在集群环境中安全可靠地执行。
到此这篇关于.net6实现分布式定时任务的完整方案的文章就介绍到这了,更多相关.net6分布式定时任务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论