您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
任务调度之时间轮实现
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
任务调度之时间轮实现
jd****
2023-07-14
IP归属:北京
202浏览
# 任务调度之时间轮实现 ## 前言 在生活中太阳的东升西落,鸟类的南飞北归,四级的轮换,每天的上下班,海水的潮汐,每月的房租车贷等等,如果用程序员的视角看,这就是一个个的定时任务,在日常的开发工作中也有很多的定时任务场景: 1. 数仓系统凌晨进行的数据同步 2. 订单12小时未支付的状态校验 3. rpc调用超时时间的校验 4. 缓存数据失效时间的延长 5. 定时开启的促销活动 6. …… 假如现在有一个任务需要3s后执行,你会如何实现? 简单点,直接一个线程的休眠,thread.sleep(3000),一行代码就能达到目的,但是性能嘛……,由于每个任务都需要一个单独的线程,当系统中存在大量任务, ## 任务调度 假如,现在有一个任务需要3s后执行,你会如何实现呢? 简单点,直接一个休眠,让线程sleep 3s,不就达到目的了吗?但是性能嘛……,由于每个任务都需要一个单独的线程,在系统中存在大量任务时,这种方案的消耗是极其巨大的,那么如何实现高效的调度呢?大佬们低头看了一眼手表,一个算法出现了 ## 时间轮的数据结构 ![](https://s3.cn-north-1.jdcloud-oss.com/shendengbucket1/2023-06-27-20-18APiGaAEkHjqSFEB.jpg) 如图所示,这就是时间轮的一个基础结构,一个存储了定时任务的环形队列,可以理解为一个时间钟,队列的每个节点称为时间槽,每个槽位又使用列表存储着需要执行的定时任务。和生活中的钟表运行机制一样,每隔固定的单位时间,就会从一个槽位跳到下一个槽位,就像秒针跳动了一次,再取出当前槽位的任务进行执行。假如固定单位时间为1S,当前槽位位2,如果需要插入一个3S后的任务,就会在槽位5的的列表里加上当前任务。等指针运行到第五个槽位时,取出任务执行就可以了。 时间轮的最大优势是在时间复杂度上的优势,一个任务简单的生命周期: 1. 创建任务,插入到数据结构中。 2. 查询任务,找到满足条件的任务 3. 执行任务。 4. 任务归档,从任务调度的列表中删出。 其中第三步的执行时间是固定的,所以1,2,4这三部就的时间复杂度就决定了整个任务调度流程的复杂度,而时间轮是链式存储结构,所以在增删和查询时,时间复杂度都是0(1),其他常见的任务调度算法例如最小堆和红黑树以及跳表。 最小堆是一颗完全二叉树而且子节点的值总是大于等于父节点的值,所以在插入时候需要判断父节点的关系,它的时间添加操作时间复杂度是O(logn),在任务执行时,只需要判断最顶节点就行,所以它的查询时间复杂度时哦O(1)。 根据红黑树的特性已经被归纳法证明它的增加的时间复杂度是O(logn),查找最小节点的时间复杂度位O(h)。 跳表的的本质是实现二分查找法的有序链表,但是他有多个层级,和红黑树的高度值相似,它的时间复杂度也是O(logn) ## 高级时间轮 如上图所示,如果一个刻度代表1S,那么一个周期就是1分钟,但是如果我一个任务是在3分钟后执行呢,如果是在一个12小时后执行呢? 当然如果是单纯的增加环形链表的长度也是可以的,直接扩大到3600*24,一天一个周期,直接放进来。但是还有更好的办法。 ### 带轮次标记的任务 任务执行轮次的计算公式:((任务执行时间-当前时间)/固定单位时间)%槽位数量 根据槽位计算公式可以算出当前任务需要插入执行的轮次,我在任务上面加一个字段round,当每次执行到该槽位时,就遍历该槽位的任务列表,每个任务的round-1,取出来round=0的任务执行就行。 ```java for(Task task:taskList){ int round= task.getRound(); round=(round-1); task.setRound(round); if(round==0){ doTask(task); } } ``` 如果任务间隔不是很大,看起来也是不错的一种解决方式。 但是工作中有很多任务,延迟执行的时间是很久以后的,例如延保履约服务成功之后会有一个7天自动完成的定时任务,甚至有一些几年后才会执行的任务,如果都用round来处理的话,那这个round将会变的非常大的一个数字,也会在任务列表中插入很多当前不需要执行的任务,如果每次都执行上面的逻辑,显然会浪费大量的资源。 ### 多层时间轮 ![](https://s3.cn-north-1.jdcloud-oss.com/shendengbucket1/2023-06-27-20-19hMQkSqPYKpZMtmY.jpg) 多层时间轮的核心思想是: 就上上图的水表,有很多小的表盘,但是每个表盘的刻度其实是不一样,又或者手表里的时分秒或者日历上的年月日。 针对时间复杂度的问题:不做遍历计算round,只要到了当前槽位,就把任务列表的所有任务拿出来执行。 针对空间复杂度的问题:分层,每个层级的时间轮刻度不一样,多个时间轮协调工作。 ![](https://s3.cn-north-1.jdcloud-oss.com/shendengbucket1/2023-06-27-20-21uN37NP216nR0MMrek.jpg) 如上图所示,第一次时间轮,每个刻度是1ms,一轮是20ms,第二个层时间轮的刻度是20ms,一轮就是400ms,第三层的刻度是400ms,一轮就是8000ms,每层的周期就等于 20ms *2的n次方。这要使用多层级时间轮就可以很容易把任务区分开来。每当高层次时间轮到达当前节点,就把任务降级到低层级的时间轮上。对于400ms的时间轮来说,小于1ms和小于399ms的任务都是过期任务,只要不大于400ms,都认为是过期任务。 代码实现的话,往上也有很多,最近比较火热的POWER-JOB的分布式调度框架就是才有的时间轮算法,粘贴下核心代码大家看下: 1.首先定义了一个任务接口 ``` public interface TimerTask extends Runnable { } ``` 2.调度中的任务对象 ```java public interface TimerFuture { /** * 获取实际要执行的任务 * @return */ TimerTask getTask(); /** * 取消任务 * @return */ boolean cancel(); /** * 任务是否取消 * @return */ boolean isCancelled(); /** * 任务是否完成 * @return */ boolean isDone(); } ``` 3.调度器接口 ```java public interface Timer { /** * 调度定时任务 */ TimerFuture schedule(TimerTask task, long delay, TimeUnit unit); /** * 停止所有调度任务 */ Set<TimerTask> stop(); } ``` 4.时间轮的实现 ```java public class HashedWheelTimer implements Timer { private final long tickDuration; private final HashedWheelBucket[] wheel; private final int mask; private final Indicator indicator; private final long startTime; private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue(); private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue(); private final ExecutorService taskProcessPool; public HashedWheelTimer(long tickDuration, int ticksPerWheel) { this(tickDuration, ticksPerWheel, 0); } /** * 新建时间轮定时器 * @param tickDuration 时间间隔,单位毫秒(ms) * @param ticksPerWheel 轮盘个数 * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池) */ public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) { this.tickDuration = tickDuration; // 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余 int ticksNum = CommonUtils.formatSize(ticksPerWheel); wheel = new HashedWheelBucket[ticksNum]; for (int i = 0; i < ticksNum; i++) { wheel[i] = new HashedWheelBucket(); } mask = wheel.length - 1; // 初始化执行线程池 if (processThreadNum <= 0) { taskProcessPool = null; }else { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build(); // 这里需要调整一下队列大小 BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192); int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum); // 基本都是 io 密集型任务 taskProcessPool = new ThreadPoolExecutor(core, 2 * core, 60, TimeUnit.SECONDS, queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool")); } startTime = System.currentTimeMillis(); // 启动后台线程 indicator = new Indicator(); new Thread(indicator, "HashedWheelTimer-Indicator").start(); } @Override public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) { long targetTime = System.currentTimeMillis() + unit.toMillis(delay); HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime); // 直接运行到期、过期任务 if (delay <= 0) { runTask(timerFuture); return timerFuture; } // 写入阻塞队列,保证并发安全(性能进一步优化可以考虑 Netty 的 Multi-Producer-Single-Consumer队列) waitingTasks.add(timerFuture); return timerFuture; } @Override public Set<TimerTask> stop() { indicator.stop.set(true); taskProcessPool.shutdown(); while (!taskProcessPool.isTerminated()) { try { Thread.sleep(100); }catch (Exception ignore) { } } return indicator.getUnprocessedTasks(); } /** * 包装 TimerTask,维护预期执行时间、总圈数等数据 */ private final class HashedWheelTimerFuture implements TimerFuture { // 预期执行时间 private final long targetTime; private final TimerTask timerTask; // 所属的时间格,用于快速删除该任务 private HashedWheelBucket bucket; // 总圈数 private long totalTicks; // 当前状态 0 - 初始化等待中,1 - 运行中,2 - 完成,3 - 已取消 private int status; // 状态枚举值 private static final int WAITING = 0; private static final int RUNNING = 1; private static final int FINISHED = 2; private static final int CANCELED = 3; public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) { this.targetTime = targetTime; this.timerTask = timerTask; this.status = WAITING; } @Override public TimerTask getTask() { return timerTask; } @Override public boolean cancel() { if (status == WAITING) { status = CANCELED; canceledTasks.add(this); return true; } return false; } @Override public boolean isCancelled() { return status == CANCELED; } @Override public boolean isDone() { return status == FINISHED; } } /** * 时间格(本质就是链表,维护了这个时刻可能需要执行的所有任务) */ private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> { public void expireTimerTasks(long currentTick) { removeIf(timerFuture -> { // processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况 if (timerFuture.status == HashedWheelTimerFuture.CANCELED) { return true; } if (timerFuture.status != HashedWheelTimerFuture.WAITING) { log.warn("[HashedWheelTimer] impossible, please fix the bug"); return true; } // 本轮直接调度 if (timerFuture.totalTicks <= currentTick) { if (timerFuture.totalTicks < currentTick) { log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug"); } try { // 提交执行 runTask(timerFuture); }catch (Exception ignore) { } finally { timerFuture.status = HashedWheelTimerFuture.FINISHED; } return true; } return false; }); } } private void runTask(HashedWheelTimerFuture timerFuture) { timerFuture.status = HashedWheelTimerFuture.RUNNING; if (taskProcessPool == null) { timerFuture.timerTask.run(); }else { taskProcessPool.submit(timerFuture.timerTask); } } /** * 模拟时针转动的线程 */ private class Indicator implements Runnable { private long tick = 0; private final AtomicBoolean stop = new AtomicBoolean(false); private final CountDownLatch latch = new CountDownLatch(1); @Override public void run() { while (!stop.get()) { // 1. 将任务从队列推入时间轮 pushTaskToBucket(); // 2. 处理取消的任务 processCanceledTasks(); // 3. 等待指针跳向下一刻 tickTack(); // 4. 执行定时任务 int currentIndex = (int) (tick & mask); HashedWheelBucket bucket = wheel[currentIndex]; bucket.expireTimerTasks(tick); tick ++; } latch.countDown(); } /** * 模拟指针转动,当返回时指针已经转到了下一个刻度 */ private void tickTack() { // 下一次调度的绝对时间 long nextTime = startTime + (tick + 1) * tickDuration; long sleepTime = nextTime - System.currentTimeMillis(); if (sleepTime > 0) { try { Thread.sleep(sleepTime); }catch (Exception ignore) { } } } /** * 处理被取消的任务 */ private void processCanceledTasks() { while (true) { HashedWheelTimerFuture canceledTask = canceledTasks.poll(); if (canceledTask == null) { return; } // 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理) if (canceledTask.bucket != null) { canceledTask.bucket.remove(canceledTask); } } } /** * 将队列中的任务推入时间轮中 */ private void pushTaskToBucket() { while (true) { HashedWheelTimerFuture timerTask = waitingTasks.poll(); if (timerTask == null) { return; } // 总共的偏移量 long offset = timerTask.targetTime - startTime; // 总共需要走的指针步数 timerTask.totalTicks = offset / tickDuration; // 取余计算 bucket index int index = (int) (timerTask.totalTicks & mask); HashedWheelBucket bucket = wheel[index]; // TimerTask 维护 Bucket 引用,用于删除该任务 timerTask.bucket = bucket; if (timerTask.status == HashedWheelTimerFuture.WAITING) { bucket.add(timerTask); } } } public Set<TimerTask> getUnprocessedTasks() { try { latch.await(); }catch (Exception ignore) { } Set<TimerTask> tasks = Sets.newHashSet(); Consumer<HashedWheelTimerFuture> consumer = timerFuture -> { if (timerFuture.status == HashedWheelTimerFuture.WAITING) { tasks.add(timerFuture.timerTask); } }; waitingTasks.forEach(consumer); for (HashedWheelBucket bucket : wheel) { bucket.forEach(consumer); } return tasks; } } } ```
上一篇:Ui2Code+ChatGPT助力低代码搭建
下一篇:从TL、ITL到TTL
jd****
文章数
1
阅读量
202
作者其他文章
01
任务调度之时间轮实现
任务调度之时间轮实现前言在生活中太阳的东升西落,鸟类的南飞北归,四级的轮换,每天的上下班,海水的潮汐,每月的房租车贷等等,如果用程序员的视角看,这就是一个个的定时任务,在日常的开发工作中也有很多的定时任务场景:数仓系统凌晨进行的数据同步订单12小时未支付的状态校验rpc调用超时时间的校验缓存数据失效时间的延长定时开启的促销活动……假如现在有一个任务需要3s后执行,你会如何实现?简单点,直接一个线程
jd****
文章数
1
阅读量
202
作者其他文章
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号