您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
定时任务原理方案综述
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
定时任务原理方案综述
自猿其说Tech
2022-07-07
IP归属:未知
20320浏览
计算机编程
本文主要介绍目前存在的定时任务处理解决方案。业务系统中存在众多的任务需要定时或定期的去执行,并且针对不同的系统架构也需要提供不同的解决方案。京东内部也提供了众多定时任务中间件来支持,总结当前各种定时任务原理,从定时任务基础原理、单机定时任务(单线程、多线程)、分布式定时任务介绍目前主流的定时任务的基本原理组成、优缺点等。希望能帮助自己和大家对定时任务能够深入的理解具体的算法和实现方案。 ### 1 背景概述 1. 定时任务,顾名思义,就是指定时间点进行执行相应的任务。业务场景中包括: 2. 每天晚上12点,将当日的销售数据发送给各个VP; 3. 订单下单十分钟未付款将自动取消订单;用户下单后发短信 4. 定时的清理系统中失效的数据 5. 心跳检测、session、请求是否timeout ### 2 定时任务基础原理 #### 2.1 小顶堆算法: 每个节点是对应的定时任务,定时任务的执行顺序通过利用堆化进行排序,循环判断每秒是否堆顶的任务是否应该执行,每次插入任务、删除任务需要重新堆化; ![](//img1.jcloudcs.com/developer.jdcloud.com/6a47f2b7-86b6-4550-a8ce-ccbdd28a528b20220707144245.png) 为什么用优先队列(小顶堆)而不是有序的数组或者链表? 因为优先队列只需要确保局部有序,它的插入、删除操作的复杂度都是O(log n);而有序数组的插入和删除复杂度为O(n);链表的插入复杂度为O(n),删除复杂度为O(1)。总体而言优先队列性能最好。 #### 2.2 时间轮算法: 链表或者数组实现时间轮: ![](//img1.jcloudcs.com/developer.jdcloud.com/89e16ae0-c9e6-4854-b172-cf6f2d7583ee20220707144303.png) round时间轮:时间轮其实就是一种环型的数据结构,可以把它想象成一个时钟,分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务。 ![](//img1.jcloudcs.com/developer.jdcloud.com/9112dd2f-d8a1-4512-8f80-d80a013b9aec20220707144313.png) #### 2.3 分层时间轮: 就是将月、周、天分成不同的时间轮层级,各自的时间轮进行定义 ![](//img1.jcloudcs.com/developer.jdcloud.com/48d38b4b-fb66-4cea-924d-c57b0ad90b1b20220707144330.png) ### 3 单机定时任务 #### 3.1 单线程任务调度 ##### 3.1.1 无限循环 创建thread,在while中一直执行,通过sleep来达到定时任务的效果 ##### 3.1.2 JDK提供了Timer Timer位于java.util包下,其内部包含且仅包含一个后台线程(TimeThread)对多个业务任务(TimeTask)进行定时定频率的调度。 ![](//img1.jcloudcs.com/developer.jdcloud.com/9616bf19-c6fd-4e4c-89b6-ceaccfd2d08c20220707144401.png) 每个Timer中包含一个TaskQueue对象,这个队列存储了所有将被调度的task, 该队列是一个根据task下一次运行时间排序形成的最小优先队列,该最小优先队列的是一个二叉堆,所以可以在log(n)的时间内完成增加task,删除task等操作,并且可以在常数时间内获得下次运行时间最小的task对象。 原理:TimerTask是按nextExecutionTime进行堆排序的。每次取堆中nextExecutionTime和当前系统时间进行比较,如果当前时间大于nextExecutionTime则执行,如果是单次任务,会将任务从最小堆,移除。否则,更新nextExecutionTime的值 ![](//img1.jcloudcs.com/developer.jdcloud.com/ac7b53c2-854d-4161-be1d-d4569547716020220707144417.png) **任务追赶特性:** schedule在执行的时候,如果Date过了,也就是Date是小于现在时间,那么会立即执行一次,然后从这个执行时间开始每隔间隔时间执行一次, scheduleAtFixedRate在执行的时候,如果Date过了。还会执行,然后才是每隔一段时间执行。 **Timer问题:** 任务执行时间长影响其他任务:如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为。Timer线程并不捕获异常,所以 TimerTask抛出的未检查的异常会终止timer线程。此时,已经被安排但尚未执行的TimerTask永远不会再执行了,新的任务也不能被调度了。 任务异常影响其他任务:Timer里面的任务如果执行时间太长,会独占Timer对象,使得后面的任务无法几时的执行 ,ScheduledExecutorService不会出现Timer的问题(除非你只搞一个单线程池的任务区)。 ##### 3.1.3 DelayQueue DelayQueue 是一个支持延时获取元素的无界阻塞队列,DelayQueue 其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素 - delayed是一个具有过期时间的元素 - PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列(核心还是基于小顶堆) 队列中的元素必须实现 Delayed 接口,并重写 getDelay(TimeUnit) 和 compareTo(Delayed) 方法 - CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。 - getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。 队列入队出队方法 - offer():入队的逻辑综合了PriorityBlockingQueue的平衡二叉堆冒泡插入以及DelayQueue的消费线程唤醒与leader领导权剥夺 - take():出队的逻辑一样综合了PriorityBlockingQueue的平衡二叉堆向下降级以及DelayQueue的Leader-Follower线程等待唤醒模式 在ScheduledExecutorService中推出了DelayedWorkQueue,DelayQueue队列元素必须是实现了Delayed接口的实例,而DelayedWorkQueue存放的是线程运行时代码RunnableScheduledFuture,该延时队列灵活的加入定时任务特有的方法调用。 ![](//img1.jcloudcs.com/developer.jdcloud.com/85c5661b-bcfc-4904-9f56-f05b45c4385320220707144524.png) leader follower模式 所有线程会有三种身份中的一种:leader和follower,以及一个工作中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。 ##### 3.1.4 Netty 实现延迟任务-HashedWheel 可以使用 Netty 提供的工具类 HashedWheelTimer 来实现延迟任务 该工具类采用的是时间轮的原理来实现的,HashedWheelTimer是一个基于hash的环形数组 ![](//img1.jcloudcs.com/developer.jdcloud.com/0ad7d9f7-61a7-4e1b-b935-7e45af7716a720220707144550.png) - 优点:能高效的处理大批定时任务,适用于对时效性不高的,可快速执行的,大量这样的“小”任务,能够做到高性能,低消耗。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager),触发(trigger)以及运行(runnable)。能够高效的管理各种延时任务,周期任务,通知任务等等。 - 缺点:对内存要求较高,占用较高的内存。时间精度要求不高:时间轮调度器的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合。因为时间轮算法的精度取决于,时间段“指针”单元的最小粒度大小,比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度。 ##### 3.1.5 MQ 实现延迟任务 1. 订单在十分钟之内未支付则自动取消。 2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。 3. 账单在一周内未支付,则自动结算。 4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。 5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。 6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。 以上这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务 RabbitMQ 实现延迟队列的方式有两种: - 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能; - 使用 rabbitmq-delayed-message-exchange 插件实现延迟功能。 同样我们也可以利用京东自研jmq的延时消费来做到以上的场景 #### 3.2 多线程定时任务 上述方案都是基于单线程的任务调度,如何引入多线程提高延时任务的并发处理能力? ##### 3.2.1 ScheduledExecutorService JDK1.5之后 推出了线程池(ScheduledExecutorService),现阶段定时任务与 JUC 包中的周期性线程池密不可分。JUC 包中的 Executor 架构带来了线程的创建与执行的分离。Executor 的继承者 ExecutorService 下面衍生出了两个重要的实现类,他们分别是 - ThreadPoolExecutor 线程池 - ScheduledThreadPoolExecutor 支持周期性任务的线程池 ![](//img1.jcloudcs.com/developer.jdcloud.com/488fc4aa-c1e2-495f-9b3b-4bda5d96c3a220220707144727.png) 通过 ThreadPoolExecutor 可以实现各式各样的自定义线程池,而 ScheduledThreadPoolExecutor 类则在自定义线程池的基础上增加了周期性执行任务的功能。 1. 最大线程数为Integer.MAX_VALUE;表明线程池内线程数不受限制:即这是因为延迟队列内用数组存放任务,数组初始长度为16,但数组长度会随着任务数的增加而动态扩容,直到数组长度为Integer.MAX_VALUE; 既然队列能存放Integer.MAX_VALUE个任务,又因为任务是延迟任务,因此保证任务不被抛弃,最多需要Integer.MAX_VALUE个线程。 2. 空闲线程的等待时间都为0纳秒,表明池内不存在空闲线程,除了核心线程:采用leader-follwer,这里等待的线程都为空闲线程,为了避免过多的线程浪费资源,所以ScheduledThreadPool线程池内更多的存活的是核心线程. 3. 任务等待队列为DelayedWorkQueue ![](//img1.jcloudcs.com/developer.jdcloud.com/9760c03f-30c7-4476-a2b2-53b119b09d6f20220707144746.png) 总结:ScheduledThreadPoolExecutor中定义内部类ScheduledFutureTask、DelayedWorkQueue;ScheduledFutureTask记录任务定时信息,DelayedWorkQueue来排序任务定时执行。ScheduledExecutorService自定义了阻塞队列DelayedWorkQueue给线程池使用,它可以根据ScheduledFutureTask的下次执行时间来阻塞take方法,并且新进来的ScheduledFutureTask会根据这个时间来进行排序,最小的最前面。 - DelayedWorkQueue:其中DelayedWorkQueue是定义的延时队列,可以看做是一个用延时时间长短作为排序的优先级队列,来实现加入任务,DelayedWorkQueue原理见3.1.3; - ScheduledFutureTask是用作实现Run方法,使得任务能够延迟执行,甚至周期执行,并且记录每个任务进入延时队列的序列号sequenceNumber。任务类ScheduledFutureTask继承FutureTask并扩展了一些属性来记录任务下次执行时间和每次执行间隔。同时重写了run方法重新计算任务下次执行时间,并把任务放到线程池队列中。 run()在处理任务时,会根据任务是否是周期任务走不通的流程: - 非周期任务,则采用futureTask类的run()方法,不存储优先队列; - 周期任务,首先确定任务的延迟时间,然后把延迟任务插入优先队列; ScheduledFutureTask的reExecutePeriodic(outerTask)方法:把周期任务插入优先队列的过程. ##### 3.2.2 实现SchedulingConfigurer接口 Spring Boot 提供了一个 SchedulingConfigurer 配置接口。我们通过 ScheduleConfig 配置文件实现 ScheduleConfiguration 接口,并重写 configureTasks() 方法,向 ScheduledTaskRegistrar 注册一个 ThreadPoolTaskScheduler 任务线程对象即可。 ![](//img1.jcloudcs.com/developer.jdcloud.com/680aed18-d917-4db7-8d8c-9eafc689f48820220707144851.png) ##### 3.2.3 Java任务调度框架Quartz ![](//img1.jcloudcs.com/developer.jdcloud.com/9766143c-58be-4075-8dc5-c0bd7e749b2a20220707144909.png) - Job:定义需要执行的任务,该类是一个接口,只定义了一个方法execute(JobExecutionContext context),在实现类的execute方法中编写所需要定时执行的Job(任务),Job运行时的信息保存在JobDataMap实例中。 - Trigger:负责设置调度策略。该类是一个接口,描述触发job执行的时间触发规则。主要有SimpleTrigger和CronTrigger这两个子类。当且仅当需调度一次或者以固定时间间隔周期执行调度,SimpleTrigger 是最适合的选择;而CronTrigger则可以通过Cron表达式定义出各种复杂时间规则的调度方案:如在周一到周五的15:00 ~ 16:00 执行调度等。 - Scheduler:调度器就相当于一个容器,装载着任务和触发器。该类是一个接口。代表一个Quartz的独立运行容器。Trigger和JobDetail可以注册到Scheduler中,两者在Scheduler中拥有各自的组及名称,组及名称是Scheduler查找定位容器中某一对象的依据。 - JobDetail:描述Job的实现类及其它相关的静态信息,如:Job名字、描述、关联监听器等信息。Quartz每次调度Job时,都重新创建一个Job实例,它接受一个Job实现类,以便运行时通过newInstance()的反射机制实例化Job。 - ThreadPool Scheduler使用一个线程池作为任务运行的基础设施,任务通过共享线程池中的线程提高运行效率。 - Listener:Quartz拥有完善的事件和监听体系,大部分组件都拥有事件,如:JobListener监听任务执行前事件、任务执行后事件;TriggerListener监听触发前事件,出发后事件;TriggerListener监听调度开始事件,关闭事件等等,可以注册响应的监听器处理感兴趣的事件。 针对Quartz 重复调度问题 在通常的情况下,乐观锁能保证不发生重复调度,但是难免发生ABA问题 配置文件加上org.quartz.jobStore.acquireTriggersWithinLock=true ##### 3.2.4 使用 Spring-Task 如果使用的是 Spring 或 Spring Boot 框架,Spring 作为一站式框架,为开发者提供了异步执行和任务调度的抽象接口TaskExecutor 和TaskScheduler。 - Spring TaskExecutor:主要用来创建线程池用来管理异步定时任务开启的线程。(防止建立线程过多导致资源浪费) - Spring TaskScheduler:创建定时任务 其中Spring自带的定时任务工具,spring task,可以将它比作一个轻量级的Quartz,而且使用起来很简单,除spring相关的包外不需要额外的包,而且支持注解和配置文件两种: 使用方法 - 声明开启 Scheduled:通过注解@EnableScheduling或者配置文件 - 任务方法添加@Scheduled注解 - 将任务的类交结 Spring 管理 (例如使用 @Component) ![](//img1.jcloudcs.com/developer.jdcloud.com/0c73ff34-8e13-4464-8778-c78a6c63fd8920220707145037.png) ![](//img1.jcloudcs.com/developer.jdcloud.com/e7e32ce8-a833-4d1c-aaef-ecbb09e6b4a620220707145049.png) 类图简要介绍: - 实现感知接口:EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware - 在spring启动完成单例bean注入,利用接口MergedBeanDefinitionPostProcessor完成扫描,利用BeanPostProcessor接口中的postProcessAfterInitialization扫描被@Scheduled注解表示的方法 - 利用ScheduledTaskRegistrar作为注册中心,监听到所有bean注入完成之后,然后开始注册全部任务 - 自定义任务调度器TaskScheduler,默认使用接口ScheduledExecutorService的实现类ScheduledThreadPoolExecutor定义单线程的线程池 @Scheduler注解源码 ![](//img1.jcloudcs.com/developer.jdcloud.com/050ae82f-72d9-4e94-a937-def19f05b8d320220707145116.png) 项目启动时,在初始化 bean 后,带 @Scheduled 注解的方法会被拦截,然后依次:构建执行线程,解析参数,加入线程池。其中作为拦截注解的类就是ScheduledAnnotationBeanPostProcessor ![](//img1.jcloudcs.com/developer.jdcloud.com/687f2914-eabf-4d7c-b836-1dbd004f0e7420220707145130.png) ![](//img1.jcloudcs.com/developer.jdcloud.com/414ffe16-650e-412f-a6d2-672dec4f40ad20220707145153.png) 返回所有的任务,该注册类实现了ScheduledTaskHolder的方法 ![](//img1.jcloudcs.com/developer.jdcloud.com/dfc13d12-519a-469a-a5ce-c1064d87509c20220707145206.png) **ScheduledTaskRegistrar** ScheduledTask注册中心,ScheduledTaskHolder接口的一个重要的实现类,维护了程序中所有配置的ScheduledTask。指定TaskScheduler或者ScheduledExecutorService都是ok的,ConcurrentTaskScheduler也是一个TaskScheduler的实现类。它是ScheduledAnnotationBeanPostProcessor的一个重要角色。 ![](//img1.jcloudcs.com/developer.jdcloud.com/4e407b36-795d-423a-84df-9d62c52275ef20220707145228.png) 重要的一步:如果我们没有指定taskScheduler ,这里面会new一个newSingleThreadScheduledExecutor,但它并不是一个合理的线程池,所以所有的任务还需要One by One顺序执行,其中默认为:Executors.newSingleThreadScheduledExecutor(),所以肯定单线程串行执行。 ![](//img1.jcloudcs.com/developer.jdcloud.com/d2c914b5-d1d7-48e4-8708-cf62482b961f20220707145241.png) ### 4 分布式定时任务 上面的方法都是关于单机定时任务的实现,如果是分布式环境可以使用 Redis 来实现定时任务。 使用 Redis 实现延迟任务的方法大体可分为两类:通过 ZSet 的方式和键空间通知的方式。 #### 4.1 通过 ZSet 的方式、Redis 的键空间通知 - 通过 ZSet 实现定时任务的思路是,将定时任务存放到 ZSet 集合中,并且将过期时间存储到 ZSet 的 Score 字段中,然后通过一个无线循环来判断当前时间内是否有需要执行的定时任务,如果有则进行执行 - 我们可以通过 Redis 的键空间通知来实现定时任务,它的实现思路是给所有的定时任务设置一个过期时间,等到了过期之后,我们通过订阅过期消息就能感知到定时任务需要被执行了,此时我们执行定时任务即可。 - 默认情况下 Redis 是不开启键空间通知的,需要我们通过 config set notify-keyspace-events Ex 的命令手动开启,开启之后定时任务的代码如下: #### 4.2 Elastic-job、xxl-job - elastic-job:是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。 - XXL-JOB:是一个轻量级分布式任务调度框架,它的核心设计理念是把任务调度分为两个核心部分:调度中心(xxl-admin),和执行器。隔离成两个部分。这是一种中心化的设计,由调度中心来统一管理和调度各个接入的业务模块(也叫执行器),接入的业务模块(执行器)只需要接收调度信号,然后去执行具体的业务逻辑,两者可以各自的进行扩容。 ------------ ###### 自猿其说Tech-JDL京东物流技术与数据智能部 ###### 作者:肖明睿
原创文章,需联系作者,授权转载
上一篇:当你对 redis 说你中意的女孩是 Mia
下一篇:浅谈分布式事务及解决方案
相关文章
Taro小程序跨端开发入门实战
Flutter For Web实践
配运基础数据缓存瘦身实践
自猿其说Tech
文章数
426
阅读量
2149963
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2149963
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号