前言
时间轮(timingwheel)是一种高效利用线程资源进行批量化调度的算法,广泛应用于各种操作系统的定时任务调度中,如linux的crontab,以及java开发中常用的dubbo、netty、kafka等框架。时间轮的核心思想是将时间划分为若干个时间间隔(bucket),每个时间间隔代表一个时间段,并通过一个循环的数据结构(类似于时钟)来管理这些时间间隔。
文章基于3.1.0版本进行分析
<dependency> <groupid>org.apache.dubbo</groupid> <artifactid>dubbo</artifactid> <version>3.1.0</version> </dependency>
一、参数说明
- tickduration:表示一个槽所代表的时间范围 默认100ms
- ticksperwheel:表示该时间轮有多少个槽 默认512
- starttime:表示该时间轮的开始时间
- interval:时间轮所能表示的时间跨度,也就是 tickduration * ticksperwheel
- currenttime:表示当前时间,也就是时间轮指针指向的时间
- wheel:表示timertasklist的数组,即各个槽,每个bucket都是一个 hashedwheelbucket 。
- hashedwheelbucket:存储timertaskentry的双向链表
- hashedwheeltimeout:延迟任务,有两个值 deadline 和 remainingrounds
- deadline:timertask 最后的执行时间
- remainingrounds:剩余圈数
- timeouts:用来保存新增的hashedwheeltimeout,每次执行会拿出10w个放入hashedwheelbucket
二、具体实现
1、hashedwheeltimer
时间轮实现类
public hashedwheeltimer( threadfactory threadfactory, long tickduration, timeunit unit, int ticksperwheel, long maxpendingtimeouts) { // 检查参数 if (threadfactory == null) { throw new nullpointerexception("threadfactory"); } if (unit == null) { throw new nullpointerexception("unit"); } if (tickduration <= 0) { throw new illegalargumentexception("tickduration must be greater than 0: " + tickduration); } if (ticksperwheel <= 0) { throw new illegalargumentexception("ticksperwheel must be greater than 0: " + ticksperwheel); } // normalize ticksperwheel to power of two and initialize the wheel. // 创建时间轮 wheel = createwheel(ticksperwheel); // 位运算标识 // 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline& mast == deadline % wheel.length // deadline = system.nanotime() + unit.tonanos(delay) - starttime; todo mask = wheel.length - 1; // convert tickduration to nanos. // 时间轮的基本时间跨度,转成最小时间单位nanos this.tickduration = unit.tonanos(tickduration); // prevent overflow. // 时间跨度限制不能太大,计算会有问题 if (this.tickduration >= long.max_value / wheel.length) { throw new illegalargumentexception(string.format( "tickduration: %d (expected: 0 < tickduration in nanos < %d", tickduration, long.max_value / wheel.length)); } // 创建时间轮工作线程 workerthread = threadfactory.newthread(worker); this.maxpendingtimeouts = maxpendingtimeouts; // 延迟任务太多的时间,警告日志 if (instance_counter.incrementandget() > instance_count_limit && warned_too_many_instances.compareandset(false, true)) { reporttoomanyinstances(); } }
参数说明:
- threadfactory
线程工厂,创建时间轮线程 - tickduration
每一tick的时间 - timeunit
tickduration的时间单位 - ticksperwheel
就是轮子一共有多个格子,即要多少个tick才能走完这个wheel一圈。
2、createwheel
创建时间轮
private static hashedwheelbucket[] createwheel(int ticksperwheel) { if (ticksperwheel <= 0) { throw new illegalargumentexception( "ticksperwheel must be greater than 0: " + ticksperwheel); } if (ticksperwheel > 1073741824) { throw new illegalargumentexception( "ticksperwheel may not be greater than 2^30: " + ticksperwheel); } // 如果不是2^n 则调整为2^n ticksperwheel = normalizeticksperwheel(ticksperwheel); // 初始化时间轮槽 hashedwheelbucket[] wheel = new hashedwheelbucket[ticksperwheel]; for (int i = 0; i < wheel.length; i++) { wheel[i] = new hashedwheelbucket(); } return wheel; }
3、newtimeout
添加新任务
public timeout newtimeout(timertask task, long delay, timeunit unit) { if (task == null) { throw new nullpointerexception("task"); } if (unit == null) { throw new nullpointerexception("unit"); } long pendingtimeoutscount = pendingtimeouts.incrementandget(); // 如果任务大于最大量,则不允许继续添加 if (maxpendingtimeouts > 0 && pendingtimeoutscount > maxpendingtimeouts) { pendingtimeouts.decrementandget(); throw new rejectedexecutionexception("number of pending timeouts (" + pendingtimeoutscount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxpendingtimeouts + ")"); } // 启动时间轮工作线程 start(); // add the timeout to the timeout queue which will be processed on the next tick. // during processing all the queued hashedwheeltimeouts will be added to the correct hashedwheelbucket. long deadline = system.nanotime() + unit.tonanos(delay) - starttime; // guard against overflow. // 如果为负数,那么说明超过了long的最大值 if (delay > 0 && deadline < 0) { deadline = long.max_value; } hashedwheeltimeout timeout = new hashedwheeltimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
4、start
启动时间轮
public void start() { switch (worker_state_updater.get(this)) { // 如果是初始化 启动实践论 case worker_state_init: // 保证只启动一次,原子性 if (worker_state_updater.compareandset(this, worker_state_init, worker_state_started)) { workerthread.start(); } break; // 已经启动过了 case worker_state_started: break; // 时间轮停止了 case worker_state_shutdown: throw new illegalstateexception("cannot be started once stopped"); default: throw new error("invalid workerstate"); } // wait until the starttime is initialized by the worker. // 线程启动执行任务是异步的,这里是等待workerthread.start(),线程已经启动了 while (starttime == 0) { try { starttimeinitialized.await(); } catch (interruptedexception ignore) { // ignore - it will be ready very soon. } } }
5、run
workerthread.start()启动后,会执行worker的run方法
public void run() { // initialize the starttime. starttime = system.nanotime(); if (starttime == 0) { // we use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. starttime = 1; } // notify the other threads waiting for the initialization at start(). // 唤醒被阻塞的start()方法。 starttimeinitialized.countdown(); do { // 等下一个槽的到达时间,开始执行上一个槽的任务 todo 不明白这里的设计,哪位大佬知道可以指点一下 final long deadline = waitfornexttick(); if (deadline > 0) { // 计算时间轮的槽位 int idx = (int) (tick & mask); // 移除取消的了task processcancelledtasks(); hashedwheelbucket bucket = wheel[idx]; // 将newtimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中 transfertimeoutstobuckets(); // 运行目前指针指向的槽中的bucket链表中的任务,执行到期的延时任务 bucket.expiretimeouts(deadline); tick++; } } // 如果worker_state一只是started状态,就一直循环 while (worker_state_updater.get(hashedwheeltimer.this) == worker_state_started); // fill the unprocessedtimeouts so we can return them from stop() method. for (hashedwheelbucket bucket : wheel) { // 清除时间轮中超时未处理的任务 bucket.cleartimeouts(unprocessedtimeouts); } for (; ; ) { // 遍历任务队列,发现如果有任务被取消,则添加到unprocessedtimeouts,也就是不需要处理的队列中。 hashedwheeltimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.iscancelled()) { unprocessedtimeouts.add(timeout); } } // 再次移除取消的了task processcancelledtasks(); }
6、waitfornexttick
一个钟表上的间隔是代表一个单位时间的间隔,那么waitfornexttick就是根据当前时间计算出跳动到下个时间的时间间隔,然后进行sleep,结束后进入下一个时间间隔,下一个间隔到来的时候返回。
/** * 根据starttime和当前槽位计算目标nanotime, * 等待时间到达 * * @return long.min_value if received a shutdown request, * current time otherwise (with long.min_value changed by +1) */ private long waitfornexttick() { // tick槽位,tickduration表示每个时间格的跨度,所以deadline返回的是下一次时间轮指针跳动的时间 long deadline = tickduration * (tick + 1); for (; ; ) { // 计算当前时间距离启动时间的时间间隔,期间休眠 final long currenttime = system.nanotime() - starttime; // 计算sleeptimems先加999999,应该是不足1ms的,补足1ms long sleeptimems = (deadline - currenttime + 999999) / 1000000; // sleeptimems小于零表示走到了下一个时间槽位置 if (sleeptimems <= 0) { if (currenttime == long.min_value) { return -long.max_value; } else { return currenttime; } } // windows 时间换算 if (iswindows()) { sleeptimems = sleeptimems / 10 * 10; } try { // 当前时间距离下一次tick时间还有一段距离,需要sleep thread.sleep(sleeptimems); } catch (interruptedexception ignored) { if (worker_state_updater.get(hashedwheeltimer.this) == worker_state_shutdown) { return long.min_value; } } } }
7、transfertimeoutstobuckets
转移任务到时间轮的具体槽中
// 将延时任务队列中等待添加到时间轮中的延时任务转移到时间轮的指定位置 private void transfertimeoutstobuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerthread when it just // adds new timeouts in a loop. // 循环100000次,也就是每次转移10w个任务 // 为了防止这个操作销毁太多时间,导致更多的任务时间不准,因此一次最多操作10w个 for (int i = 0; i < 100000; i++) { // 从阻塞队列中获得具体的任务 hashedwheeltimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == hashedwheeltimeout.st_cancelled) { // was cancelled in the meantime. continue; } // 计算tick次数,deadline表示当前任务的延迟时间,tickduration表示时间槽的间隔,两者相除就可以计算当前任务需要tick几次才能被执行 long calculated = timeout.deadline / tickduration; // 计算剩余的轮数, 只有 timer 走够轮数, 并且到达了 task 所在的 slot, task 才会过期.(被执行) timeout.remainingrounds = (calculated - tick) / wheel.length; // ensure we don't schedule for past. // 如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行 final long ticks = math.max(calculated, tick); // 算出任务应该插入的 wheel 的 slot, stopindex = tick 次数 & mask, mask = wheel.length - 1 int stopindex = (int) (ticks & mask); // 把timeout任务插入到指定的bucket链中。 hashedwheelbucket bucket = wheel[stopindex]; bucket.addtimeout(timeout); } }
8、expiretimeouts
当指针跳动到某一个时间槽中时,会就触发这个槽中的任务的执行。该功能是通过expiretimeouts来实现
void expiretimeouts(long deadline) { // 双向链表 hashedwheeltimeout timeout = head; // process all timeouts // 遍历当前时间槽中的所有任务 while (timeout != null) { hashedwheeltimeout next = timeout.next; // 如果当前任务要被执行,那么remainingrounds应该小于或者等于0 if (timeout.remainingrounds <= 0) { // 从bucket链表中移除当前timeout,并返回链表中下一个timeout next = remove(timeout); // 如果timeout的时间小于当前的时间,那么就调用expire执行task if (timeout.deadline <= deadline) { timeout.expire(); } else { // the timeout was placed into a wrong slot. this should never happen. // 不可能发生的情况,就是说round已经为0了,deadline却 > 当前槽的deadline throw new illegalstateexception(string.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.iscancelled()) { next = remove(timeout); } else { // 因为当前的槽位已经过了,说明已经走了一圈了,把轮数减一 timeout.remainingrounds--; } // 把指针放置到下一个timeout timeout = next; } }
总结
时间轮(timingwheel)在计算机科学中,特别是在任务调度和时间管理方面,具有重要的意义,我们可以结合业务进行使用
- 节省cpu资源
- 易于实现和维护
- 批量化调度模型
- 高效处理大量定时任务
- 灵活适应不同应用场景
到此这篇关于springboot+dubbo实现时间轮算法的文章就介绍到这了,更多相关springboot dubbo时间轮内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论