您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
深入探究JDK中Timer的源码解析
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
深入探究JDK中Timer的源码解析
自猿其说Tech
2021-03-29
IP归属:未知
692800浏览
计算机编程
## 导言 在项目开发过程中,经常会遇到需要使用定时执行或延时执行任务的场景。比如我们在活动结束后自动汇总生成效果数据、导出Excel表并将文件通过邮件推送到用户手上,再比如微信运动每天都会在十点后向你个位数的步数(在?把摄像头从我家拆掉!)。 本文将会从源码角度对`java.util.Timer`进行解析。 ## 源码解析 ### TimerTask --- `TimerTask`类是一个抽象类,实现了`Runnable`接口 ```Java public abstract class TimerTask implements Runnable ``` #### `TimerTask`对象的成员 首先来看`TimerTask`类的成员部分 ```Java final Object lock = new Object(); int state = VIRGIN; static final int VIRGIN = 0; static final int SCHEDULED = 1; static final int EXECUTED = 2; static final int CANCELLED = 3; long nextExecutionTime; long period = 0; ``` 对象`lock`是对外用来控制`TimerTask`对象修改的锁对象,它控制了锁的粒度——只会影响类属性的变更,而不会影响整个类的方法调用。接下来是`state`属性表示`TimerTask`对象的状态。`nextExecutionTime`属性表示`TimerTask`对象的下一次执行时间,当`TimerTask`对象被添加到任务队列后,将会使用这个属性来按照从小到大的顺序排序。`period`属性表示`TimerTask`对象的执行周期,`period`属性的值有三种情况 1. 如果是0,那么表示任务不会重复执行 2. 如果是正数,那么就表示任务按照相同的执行间隔来重复执行 3. 如果是负数,那么就表示任务按照相同的执行速率来重复执行 #### `TimerTask`对象的构造方法 `Timer`对象的构造方法很简单,就是`protected`限定的默认构造方法,不再赘述 ```Java protected TimerTask() { } ``` #### `TimerTask`对象的成员方法 接下来我们看下`TimerTask`对象的成员方法 ```Java public abstract void run(); public boolean cancel() { synchronized(lock) { boolean result = (state == SCHEDULED); state = CANCELLED; return result; } } public long scheduledExecutionTime() { synchronized(lock) { return (period < 0 ? nextExecutionTime + period : nextExecutionTime - period); } } ``` 首先是`run()`方法实现自`Runnable()`接口,为抽象方法,所有的任务都需要实现此方法。接下来是`cancel()`方法,这个方法会将任务的状态标记为`CANCELLED`,如果在结束前任务处于被调度状态,那么就返回`true`,否则返回`false`。至于`scheduledExecutionTime()`只是用来计算重复执行的下一次执行时间,在`Timer`中并没有被使用过,不再赘述。 ### TimerQueue --- `TimerQueue`是`Timer`维护任务调度顺序的最小优先队列,使用的是最小二叉堆实现,如上文所述,排序用的Key是`TimerTask`的`nextExecutionTime`属性。 在介绍`TimerQueue`之前,我们先补充下数据结构的基础知识 #### 二叉堆(Binary heap) **二叉堆是一颗除了最底层的元素外,所有层都被填满,最底层的元素从左向右填充的完全二叉树(complete binary tree)**。完全二叉树可以用数组表示,假设元素从1开始编号,下标为`i`的元素,它的左孩子的下标为`2*i`,它的右孩子的下标为`2*i+1`。 **二叉堆的任意非叶节点满足堆序性**:假设我们定义的是最小优先队列,那么我们使用的是小根堆,任意节点的元素值都小于它的左孩子和右孩子(如果有的话)的元素值。 **二叉堆的定义满足递归定义法**,即二叉堆的任意子树都是二叉堆,单个节点本身就是二叉堆。 根据堆序性和递归定义法,**二叉堆的根节点一定是整个二叉堆中元素值最小的节点**。 与堆结构有关的操作,除了`add`, `getMin`和`removeMin`之外,还有`fixUp`、`fixDown`和`heapify`三个关键操作,而`add`、`getMin`和`removeMin`也是通过这三个操作来完成的,下面来简单介绍下这三个操作 1. `fixUp`: 当我们向二叉堆中添加元素时,我们可以简单地将它添加到二叉树的末尾,此时从这个节点到根的完整路径上不满足堆序性。之后将它不断向上浮,直到遇到比它小的元素,此时整个二叉树的所有节点都满足堆序性。当我们减少了二叉堆中元素的值的时候也可以通过这个方法来维护二叉堆。 2. `fixDown`: 当我们从二叉堆中删除元素时,我们可以简单地将二叉树末尾的元素移动到根,此时不一定满足堆序性,之后将它不断下沉,直到遇到比它大的元素,此时整个二叉树的所有节点都满足堆序性。当我们增加了二叉堆中元素的值的时候也可以通过这个方法来维护二叉堆。 3. `heapify`: 当我们拿到无序的数组的时候,也可以假设我们拿到了一棵不满足堆序性的二叉树,此时我们将所有的非叶节点向下沉,直到整个二叉树的所有节点都满足堆序性,此时我们得到了完整的二叉堆。这个操作是原地操作,不需要额外的空间复杂度,而时间复杂度是O(N)。 关于二叉堆的详细内容将会在后续的文章中展开详解,这里只做简单的介绍,了解这些我们就可以开始看`TimerQueue`的源码。 #### `TimerQueue`的完整代码 我们直接来看`TaskQueue`的完整代码 ```Java class TaskQueue { private TimerTask[] queue = new TimerTask[128]; private int size = 0; int size() { return size; } void add(TimerTask task) { // Grow backing store if necessary if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); queue[++size] = task; fixUp(size); } TimerTask getMin() { return queue[1]; } TimerTask get(int i) { return queue[i]; } void removeMin() { queue[1] = queue[size]; queue[size--] = null; // Drop extra reference to prevent memory leak fixDown(1); } void quickRemove(int i) { assert i <= size; queue[i] = queue[size]; queue[size--] = null; // Drop extra ref to prevent memory leak } void rescheduleMin(long newTime) { queue[1].nextExecutionTime = newTime; fixDown(1); } boolean isEmpty() { return size==0; } void clear() { // Null out task references to prevent memory leak for (int i=1; i<=size; i++) queue[i] = null; size = 0; } private void fixUp(int k) { while (k > 1) { int j = k >> 1; if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } private void fixDown(int k) { int j; while ((j = k << 1) <= size && j > 0) { if (j < size && queue[j].nextExecutionTime > queue[j+1].nextExecutionTime) j++; // j indexes smallest kid if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) break; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } void heapify() { for (int i = size/2; i >= 1; i--) fixDown(i); } } ``` 按照我们之前介绍的二叉堆的相关知识,我们可以看到`TimerQueue`维护了`TimerTask`的数组`queue`,初始大小`size`为0。 `add`操作首先判断了数组是否满了,如果数组已经满了,那么先执行扩容操作,再进行添加操作。如上所述,`add`操作先将元素放到二叉树末尾的元素(`queue[++size]`),之后对这个元素进行上浮来维护堆序性。 `getMin`直接返回二叉树的树根(`queue[1]`),`get`方法直接返回数组的第`i`个元素。`removeMin`方法会将二叉树末尾的元素(`queue[size]`)移动到树根(`queue[1]`),并将原本二叉树末尾的元素设置成`null`,来让垃圾回收器回收这个`TimerTask`,之后执行`fixDown`来维护堆序性,`quickRemove`也是相同的过程,只不过它在移动元素后没有执行下沉操作,当连续执行多次`quickRemove`后统一执行`heapify`来维护堆序性。 `rescheduleMin`会将树根元素的元素值设置成`newTime`,并将它下沉到合适的位置。 `fixUp`、`fixDown`和`heapify`操作就如上文所述,用来维护二叉堆的读序性。不过这里面实现的`fixUp`和`fixDown`并不优雅,基于交换临位元素的实现需要使用T(3log(N))的时间,而实际上有T(log(N))的实现方法。后续的文章中会详细介绍优先队列与二叉堆的实现方式。 ### TimerThread --- 我们直接来看TimerThread的代码 ```Java class TimerThread extends Thread { boolean newTasksMayBeScheduled = true; private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // Eliminate obsolete references } } } private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } } } ``` 首先是控制变量`newTasksMayBeScheduled`,表示当前工作线程是否应该继续执行任务,当它为`false`的时候它将不会再从任务队列中取任务执行,表示当前工作线程已结束。接下来的`queue`变量是通过构造方法传进来的任务队列,工作线程的任务队列与`Timer`共享,实现生产消费者模型。 进入到`run()`方法,`run()`方法会调用`mainLoop()`方法来执行主循环,而`finally`代码块会在主循环结束后清空任务队列实现优雅退出。 在`mainLoop()`方法中执行了死循环来拉取执行任务,在死循环中首先获取`queue`的锁来实现线程同步,接下来判断任务队列是否为且工作线程是否停止,如果任务队列为空且工作线程未停止,那么就使用`queue.wait()`来等待`Timer`添加任务后唤醒该线程,`Object#wait()`方法会释放当前线程所持有的该对象的锁,关于wait/notisfy的内容可以去看[Java API](https://docs.oracle.com/en/java/javase/15/docs/api/java.base/java/lang/Object.html#wait(long,int))相关介绍。如果`queue`退出等待后依旧为空,则表示`newTasksMayBeScheduled`为`false`,工作线程已停止,退出主循环,否则会从任务队列中取出需要最近执行的任务(并不会删除任务)。 取到需要最近执行的任务后,获取该任务的锁,并判断该任务是否已经停止,如果该任务已经停止,那么就把它从任务队列中移除,并什么都不做继续执行主循环。接下来判断当前时间是否小于等于任务的下一次执行时间,如果满足条件则将`taskFired`设置成`true`,判断当前任务是否需要重复执行。如果不需要重复执行就将它从任务队列中移除,并将任务状态设置成`EXECUTED`,如果需要重复执行就根据`period`设置它的下一次执行时间并重新调整任务队列。 完成这些操作后,如果`taskFired`为`false`,就让`queue`对象进入有限等待状态,很容易得到我们需要的最大等待时间为`executionTime - currentTime`。如果`taskFired`为`true`,那么就释放锁并执行被取出的任务。 ### Timer --- #### `Timer`对象的成员 首先来看`Timer`的成员部分 ```Java private final TaskQueue queue = new TaskQueue(); private final TimerThread thread = new TimerThread(queue); private final Object threadReaper = new Object() { @SuppressWarnings("deprecation") protected void finalize() throws Throwable { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.notify(); // In case queue is empty. } } }; private static final AtomicInteger nextSerialNumber = new AtomicInteger(0); ``` 其中`queue`对象是如前面所说,为了任务调度的最小优先队列。接下来是`TimerThread`,它是`Timer`的工作线程,在`Timer`创建时就已经被分配,并与`Timer`共享任务队列。 `threadReaper`是一个只复写了`finalize`方法的对象,它的作用是当`Timer`对象没有存活的引用后,终止任务线程,并等待任务队列中的所有任务执行结束后退出工作线程,实现优雅退出。 `nextSerialNumber`用来记录工作线程的序列号,全局唯一,避免生成的线程名称冲突。 #### `Timer`对象的构造方法 接下来我们看下`Timer`的所有构造方法 ```Java public Timer() { this("Timer-" + serialNumber()); } public Timer(boolean isDaemon) { this("Timer-" + serialNumber(), isDaemon); } public Timer(String name) { thread.setName(name); thread.start(); } public Timer(String name, boolean isDaemon) { thread.setName(name); thread.setDaemon(isDaemon); thread.start(); } ``` 可以看到,所有的构造构造方法所做的事都相同:设置工作线程属性,并启动工作线程。 #### 成员函数 接下来我们可以看下`Timer`的成员函数,我们首先不考虑`cancel()`和`purge()`方法,直接看`schedule`系列方法 ```Java public void schedule(TimerTask task, long delay) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); } public void schedule(TimerTask task, Date time) { sched(task, time.getTime(), 0); } public void schedule(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, -period); } public void schedule(TimerTask task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), -period); } public void scheduleAtFixedRate(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, period); } public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), period); } ``` 可以看到,所有的`schedule`方法除了做参数教研外,都将延迟时间和计划执行时间转化为时间戳委托给`sched`方法来执行。`schedule`和`scheduleAtFixedRate`传递的参数都相同,不过在传递`period`参数时使用符号来区分周期执行的方式。 接下来我们可以看下这位神秘嘉宾——`sched`方法到底做了哪些事 ```Java private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } } ``` `sched`方法首先做了一些参数校验,保证期待执行时间不小于0,且执行周期不至于太大。接下来获取任务队列`queue`对象的`monitor`(监视器锁),如果`Timer`的工作线程已经被停止了,那么就会抛出`IllegalStateException`来禁止继续添加任务,`newTasksMayBeScheduled`这个变量将会在稍后介绍。之后`sched`方法会尝试获取`task.lock`对象的锁,判断`task`的状态避免重复添加,并设置`task`的下一次执行时间、`task`的执行周期和状态。之后将`task`添加到任务队列中,如果当前任务就是执行时间最近的任务,那么就会唤起等待`queue`对象的线程(其实就是`thread`工作线程)继续执行。 ## 总结 本文从源码角度对`java.util.Timer`进行了深入解析,通过本文,读者可以具备自行实现Timer的能力。 ------------ ###### 自猿其说Tech-JDL京东物流技术发展部 ###### 作者:中台技术部 纪卓志
原创文章,需联系作者,授权转载
上一篇:一体化智能安全防御,京东云星盾安全加速正式发布
下一篇:Widget开发实践
相关文章
Taro小程序跨端开发入门实战
Flutter For Web实践
配运基础数据缓存瘦身实践
自猿其说Tech
文章数
426
阅读量
2149964
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
最新回复
丨
点赞排行
共0条评论
自猿其说Tech
文章数
426
阅读量
2149964
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号