前言
线程池的具体实现有两种,分别是ThreadPoolExecutor 默认线程池和ScheduledThreadPoolExecutor 定时线程池,上一篇已经分析过ThreadPoolExecutor原理与使用了,本篇我们来重点分析下ScheduledThreadPoolExecutor的原理与使用。
《并发编程之Executor线程池原理与源码解读》
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 与 ThreadPoolExecutor 线程池的概念有些区别,它是一个支持任务周期性调度的线程池。
ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor,同时通过实现 ScheduledExecutorSerivce 来扩展基础线程池的功能,使其拥有了调度能力。其整个调度的核心在于内部类 DelayedWorkQueue ,一个有序的延时队列。
定时线程池类的类结构图如下:
ScheduledThreadPoolExecutor 的出现,很好的弥补了传统 Timer 的不足,具体对比看下表:
TimerScheduledThreadPoolExecutor线程单线程多线程多任务任务之间相互影响任务之间不影响调度时间绝对时间相对时间异常单任务异常,后续任务受影响无影响
工作原理
它用来处理延时任务或定时任务
它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三种提交任务的方式:
- schedule,特定时间延时后执行一次任务
- scheduledAtFixedRate,固定周期执行任务(与任务执行时间无关,周期是固定的)
- scheduledWithFixedDelay,固定延时执行任务(与任务执行时间有关,延时从上一次任务完成后开始)
它采用 DelayedWorkQueue 存储等待的任务
- DelayedWorkQueue 内部封装了一个 PriorityQueue ,它会根据 time 的先后时间排序,若 time 相同则根据 sequenceNumber 排序;
- DelayedWorkQueue 也是一个无界队列;
因为前面讲阻塞队列实现的时候,已经对DelayedWorkQueue进行了说明,更多内容请查看《阻塞队列 — DelayedWorkQueue源码分析》
工作线程的执行过程:
- 工作线程会从DelayedWorkerQueue取已经到期的任务去执行;
- 执行结束后重新设置任务的到期时间,再次放回DelayedWorkerQueue。
take方法是什么时候调用的呢? 在ThreadPoolExecutor中,getTask方法,工作线程会循环地从workQueue中取任务。但定时任务却不同,因为如果一旦getTask方法取出了任务就开始执行了,而这时可能还没有到执行的时间,所以在take方法中,要保证只有在到指定的执行时间的时候任务才可以被取走。
PS:对于以上原理的理解,可以通过下面的源码分析加深印象。
源码分析
构造方法
ScheduledThreadPoolExecutor有四个构造形式:
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }
-
- public ScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), threadFactory);
- }
-
- public ScheduledThreadPoolExecutor(int corePoolSize,
- RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), handler);
- }
-
- public ScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), threadFactory, handler);
- }
当然我们也可以使用工具类Executors的newScheduledThreadPool的方法,快速创建。注意这里使用的DelayedWorkQueue。
ScheduledThreadPoolExecutor没有提供带有最大线程数的构造函数的,默认是Integer.MAX_VALUE,说明其可以无限制的开启任意线程执行任务,在大量任务系统,应注意这一点,避免内存溢出。
核心方法
核心方法主要介绍ScheduledThreadPoolExecutor的调度方法,其他方法与 ThreadPoolExecutor 一致。调度方法均由 ScheduledExecutorService 接口定义:
- public interface ScheduledExecutorService extends ExecutorService {
- // 特定时间延时后执行一次Runnable
- public ScheduledFuture> schedule(Runnable command,
- long delay, TimeUnit unit);
- // 特定时间延时后执行一次Callable
- public
ScheduledFuture schedule(Callable callable, - long delay, TimeUnit unit);
- // 固定周期执行任务(与任务执行时间无关,周期是固定的)
- public ScheduledFuture> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit);
- // 固定延时执行任务(与任务执行时间有关,延时从上一次任务完成后开始)
- public ScheduledFuture> scheduleWithFixedDelay(Runnable command,
- long initialDelay,
- long delay,
- TimeUnit unit);
- }
我们再来看一下接口的实现,具体是怎么来实现线程池任务的提交。因为最终都回调用 delayedExecute 提交任务。所以,我们这里只分析schedule方法,该方法是指任务在指定延迟时间到达后触发,只会执行一次。源代码如下:
- public ScheduledFuture> schedule(Runnable command,
- long delay,
- TimeUnit unit) {
- //参数校验
- if (command == null || unit == null)
- throw new NullPointerException();
- //这里是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
- //然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法
- RunnableScheduledFuture> t = decorateTask(command,
- new ScheduledFutureTask
(command, null, - triggerTime(delay, unit)));
- //包装好任务以后,就进行提交了
- delayedExecute(t);
- return t;
- }
delayedExecute 任务提交方法:
- private void delayedExecute(RunnableScheduledFuture> task) {
- //如果线程池已经关闭,则使用拒绝策略把提交任务拒绝掉
- if (isShutdown())
- reject(task);
- else {
- //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
- super.getQueue().add(task);//使用用的DelayedWorkQueue
- //如果当前状态无法执行任务,则取消
- if (isShutdown() &&
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- task.cancel(false);
- else
- //这里是增加一个worker线程,避免提交的任务没有worker去执行
- //原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
- ensurePrestart();
- }
- }
我们可以看到提交到线程池的任务都包装成了 ScheduledFutureTask,继续往下我们再来研究下。
ScheduledFutureTask
从ScheduledFutureTask类的定义可以看出,ScheduledFutureTask类是ScheduledThreadPoolExecutor类的私有内部类,继承了FutureTask类,并实现了RunnableScheduledFuture接口。也就是说,ScheduledFutureTask具有FutureTask类的所有功能,并实现了RunnableScheduledFuture接口的所有方法。ScheduledFutureTask类的定义如下所示:
- private class ScheduledFutureTask
extends FutureTask implements RunnableScheduledFuture
ScheduledFutureTask类继承图如下:
成员变量
SchduledFutureTask接收的参数(成员变量):
- // 任务开始的时间
-
- private long time;
-
- // 任务添加到ScheduledThreadPoolExecutor中被分配的唯一序列号
-
- private final long sequenceNumber;
-
- // 任务执行的时间间隔
-
- private final long period;
-
- //ScheduledFutureTask对象,实际指向当前对象本身
-
- RunnableScheduledFuture outerTask = this;
-
- //当前任务在延迟队列中的索引,能够更加方便的取消当前任务
-
- int heapIndex;
解析:
- sequenceNumber:任务添加到ScheduledThreadPoolExecutor中被分配的唯一序列号,可以根据这个序列号确定唯一的一个任务,如果在定时任务中,如果一个任务是周期性执行的,但是它们的sequenceNumber的值相同,则被视为是同一个任务。
- time:下一次执行任务的时间。
- period:任务的执行周期。
- outerTask:ScheduledFutureTask对象,实际指向当前对象本身。此对象的引用会被传入到周期性执行任务的ScheduledThreadPoolExecutor类的reExecutePeriodic方法中。
- heapIndex:当前任务在延迟队列中的索引,这个索引能够更加方便的取消当前任务。
构造方法
ScheduledFutureTask类继承了FutureTask类,并实现了RunnableScheduledFuture接口。在ScheduledFutureTask类中提供了如下构造方法。
- ScheduledFutureTask(Runnable r, V result, long ns) {
- super(r, result);
- this.time = ns;
- this.period = 0;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
-
- ScheduledFutureTask(Runnable r, V result, long ns, long period) {
- super(r, result);
- this.time = ns;
- this.period = period;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
-
- ScheduledFutureTask(Callable
callable, long ns) { - super(callable);
- this.time = ns;
- this.period = 0;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
FutureTask的构造方法如下:
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
通过源码可以看到,在ScheduledFutureTask类的构造方法中,首先会调用FutureTask类的构造方法为FutureTask类的callable和state成员变量赋值,接下来为ScheduledFutureTask类的time、period和sequenceNumber成员变量赋值。理解起来比较简单。
getDelay方法
我们先来看getDelay方法的源码,如下所示:
- //获取下次执行任务的时间距离当前时间的纳秒数
- public long getDelay(TimeUnit unit) {
- return unit.convert(time - now(), NANOSECONDS);
- }
getDelay方法比较简单,主要用来获取下次执行任务的时间距离当前系统时间的纳秒数。
compareTo方法
ScheduledFutureTask类在类的结构上实现了Comparable接口,compareTo方法主要是对Comparable接口定义的compareTo方法的实现。源码如下所示:
- public int compareTo(Delayed other) {
- if (other == this)
- return 0;
- if (other instanceof ScheduledFutureTask) {
- ScheduledFutureTask> x = (ScheduledFutureTask>)other;
- long diff = time - x.time;
- if (diff < 0)
- return -1;
- else if (diff > 0)
- return 1;
- else if (sequenceNumber < x.sequenceNumber)
- return -1;
- else
- return 1;
- }
- long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
- return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
- }
这段代码看上去好像是对各种数值类型数据的比较,本质上是对延迟队列中的任务进行排序。排序规则为:
- 首先比较延迟队列中每个任务下次执行的时间,下次执行时间距离当前时间短的任务会排在前面;
- 如果下次执行任务的时间相同,则会比较任务的sequenceNumber值,sequenceNumber值小的任务会排在前面。
isPeriodic方法
isPeriodic方法的源代码如下所示:
- //判断是否是周期性任务
- public boolean isPeriodic() {
- return period != 0;
- }
这个方法主要是用来判断当前任务是否是周期性任务。这里只要判断运行任务的执行周期不等于0就能确定为周期性任务了。因为无论period的值是大于0还是小于0,当前任务都是周期性任务。
setNextRunTime方法
setNextRunTime方法的作用主要是设置当前任务下次执行的时间,源码如下所示:
- private void setNextRunTime() {
- long p = period;
- //固定频率,上次执行任务的时间加上任务的执行周期
- if (p > 0)
- time += p;
- //相对固定的延迟执行,当前系统时间加上任务的执行周期
- else
- time = triggerTime(-p);
- }
这里再一次证明了使用isPeriodic方法判断当前任务是否为周期性任务时,只要判断period的值是否不等于0就可以了。
- 因为如果当前任务时固定频率执行的周期性任务,会将周期period当作正数来处理;
- 如果是相对固定的延迟执行当前任务,则会将周期period当作负数来处理。
这里我们看到在setNextRunTime方法中,调用了ScheduledThreadPoolExecutor类的triggerTime方法。接下来我们看下triggerTime方法的源码。
ScheduledThreadPoolExecutor类的triggerTime方法
triggerTime方法用于获取延迟队列中的任务下一次执行的具体时间。源码如下所示。
- private long triggerTime(long delay, TimeUnit unit) {
- return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
- }
-
- long triggerTime(long delay) {
- return now() +
- ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
- }
这两个triggerTime方法的代码比较简单,就是获取下一次执行任务的具体时间。有一点需要注意的是:delay < (Long.MAX_VALUE >> 1判断delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,则直接返回delay,否则需要处理溢出的情况。
我们看到在triggerTime方法中处理防止溢出的逻辑使用了ScheduledThreadPoolExecutor类的overflowFree方法,接下来,我们就看看ScheduledThreadPoolExecutor类的overflowFree方法的实现。
ScheduledThreadPoolExecutor类的overflowFree方法
overflowFree方法的源代码如下所示:
- private long overflowFree(long delay) {
- //获取队列中的节点
- Delayed head = (Delayed) super.getQueue().peek();
- //获取的节点不为空,则进行后续处理
- if (head != null) {
- //从队列节点中获取延迟时间
- long headDelay = head.getDelay(NANOSECONDS);
- //如果从队列中获取的延迟时间小于0,并且传递的delay
- //值减去从队列节点中获取延迟时间小于0
- if (headDelay < 0 && (delay - headDelay < 0))
- //将delay的值设置为Long.MAX_VALUE + headDelay
- delay = Long.MAX_VALUE + headDelay;
- }
- //返回延迟时间
- return delay;
- }
通过对overflowFree方法的源码分析,可以看出overflowFree方法本质上就是为了限制队列中的所有节点的延迟时间在Long.MAX_VALUE值之内,防止在compareTo方法中溢出。
cancel方法
cancel方法的作用主要是取消当前任务的执行,源码如下所示:
- public boolean cancel(boolean mayInterruptIfRunning) {
- //取消任务,返回任务是否取消的标识
- boolean cancelled = super.cancel(mayInterruptIfRunning);
- //如果任务已经取消
- //并且需要将任务从延迟队列中删除
- //并且任务在延迟队列中的索引大于或者等于0
- if (cancelled && removeOnCancel && heapIndex >= 0)
- //将当前任务从延迟队列中删除
- remove(this);
- //返回是否成功取消任务的标识
- return cancelled;
- }
这段代码理解起来相对比较简单,首先调用取消任务的方法,并返回任务是否已经取消的标识。如果任务已经取消,并且需要移除任务,同时,任务在延迟队列中的索引大于或者等于0,则将当前任务从延迟队列中移除。最后返回任务是否成功取消的标识。
run方法
run方法可以说是ScheduledFutureTask类的核心方法,是对Runnable接口的实现,源码如下所示:
- public void run() {
- //当前任务是否是周期性任务
- boolean periodic = isPeriodic();
- //线程池当前运行状态下不能执行周期性任务
- if (!canRunInCurrentRunState(periodic))
- //取消任务的执行
- cancel(false);
- //如果不是周期性任务
- else if (!periodic)
- //则直接调用FutureTask类的run方法执行任务
- ScheduledFutureTask.super.run();
- //如果是周期性任务,则调用FutureTask类的runAndReset方法执行任务
- //如果任务执行成功
- else if (ScheduledFutureTask.super.runAndReset()) {
- //设置下次执行任务的时间
- setNextRunTime();
- //重复执行任务
- reExecutePeriodic(outerTask);
- }
- }
整理一下方法的逻辑:
- 首先判断当前任务是否是周期性任务。如果线程池当前运行状态下不能执行周期性任务,则取消任务的执行,否则执行步骤2;
- 如果当前任务不是周期性任务,则直接调用FutureTask类的run方法执行任务,会设置执行结果,然后直接返回,否则执行步骤3;
- 如果当前任务是周期性任务,则调用FutureTask类的runAndReset方法执行任务,不会设置执行结果,然后直接返回,否则执行步骤4;
- 如果任务执行成功,则设置下次执行任务的时间,同时,将任务设置为重复执行。
这里,调用了FutureTask类的run方法和runAndReset方法,并且调用了ScheduledThreadPoolExecutor类的reExecutePeriodic方法。接下来,我们分别看下这些方法的实现。
FutureTask类的run方法
FutureTask类的run方法源码如下所示:
- public void run() {
- //状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
- //状态如果是NEW,则尝试把当前执行线程保存在runner字段中
- //如果赋值失败则直接返回
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
- return;
- try {
- Callable
c = callable; - if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- //执行任务
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- result = null;
- ran = false;
- //任务异常
- setException(ex);
- }
- if (ran)
- //任务正常执行完毕
- set(result);
- }
- } finally {
-
- runner = null;
- int s = state;
- //如果任务被中断,执行中断处理
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- }
代码的整体逻辑为:
- 判断当前任务的state是否等于NEW,如果不为NEW则说明任务或者已经执行过,或者已经被取消,直接返回;
- 如果状态为NEW则接着会通过unsafe类把任务执行线程引用CAS的保存在runner字段中,如果保存失败,则直接返回;
- 执行任务;如果任务执行发生异常,则调用setException()方法保存异常信息。
FutureTask类的runAndReset方法
方法的源码如下所示:
- protected boolean runAndReset() {
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return false;
- boolean ran = false;
- int s = state;
- try {
- Callable
c = callable; - if (c != null && s == NEW) {
- try {
- c.call(); // don't set result
- ran = true;
- } catch (Throwable ex) {
- setException(ex);
- }
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- s = state;
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- return ran && s == NEW;
- }
FutureTask类的runAndReset方法与run方法的逻辑基本相同,只是runAndReset方法会重置当前任务的执行状态。
ScheduledThreadPoolExecutor类的reExecutePeriodic方法
reExecutePeriodic重复执行任务方法,源代码如下所示:
- void reExecutePeriodic(RunnableScheduledFuture> task) {
- //线程池当前状态下能够执行任务
- if (canRunInCurrentRunState(true)) {
- //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
- super.getQueue().add(task);//使用用的DelayedWorkQueue
- //线程池当前状态下不能执行任务,并且成功移除任务
- if (!canRunInCurrentRunState(true) && remove(task))
- //取消任务
- task.cancel(false);
- else
- //这里是增加一个worker线程,避免提交的任务没有worker去执行
- //原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
- ensurePrestart();
- }
- }
总体来说reExecutePeriodic方法的逻辑比较简单,需要注意的是:调用reExecutePeriodic方法的时候已经执行过一次任务,所以,并不会触发线程池的拒绝策略;传入reExecutePeriodic方法的任务一定是周期性的任务。
DelayedWorkQueue
ScheduledThreadPoolExecutor之所以要自己实现阻塞的工作队列,是因为 ScheduleThreadPoolExecutor 要求的工作队列有些特殊。
DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面(注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的)。
堆结构如下图:
可见,DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组:
在这种结构中,可以发现有如下特性: 假设“第一个元素” 在数组中的索引为 0 的话,则父结点和子结点的位置关系如下:
- 索引为 的左孩子的索引是 (2∗i+1);
- 索引为 的右孩子的索引是 (2∗i+2);
- 索引为 的父结点的索引是 floor((i−1)/2);
为什么要使用DelayedWorkQueue呢?
- 定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
- DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。
因为前面讲阻塞队列实现的时候,已经对DelayedWorkQueue进行了说明,更多内容请查看《阻塞队列 — DelayedWorkQueue源码分析》
总结
- 与Timer执行定时任务比较,相比Timer,ScheduledThreadPoolExecutor有说明优点?(文章前面分析过)
- ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以它也是一个线程池,也有 coorPoolSize 和 workQueue,但是 ScheduledThreadPoolExecutor特殊的地方在于,自己实现了优先工作队列 DelayedWorkQueue ;
- ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService,所以就有了任务调度的方法,如 schedule 、 scheduleAtFixedRate 、 scheduleWithFixedDelay ,同时注意他们之间的区别;
- 内部类 ScheduledFutureTask 继承者FutureTask,实现了任务的异步执行并且可以获取返回结果。同时实现了Delayed接口,可以通过getDelay方法获取将要执行的时间间隔;
- 周期任务的执行其实是调用了FutureTask的 runAndReset 方法,每次执行完不设置结果和状态。
- DelayedWorkQueue的数据结构,它是一个基于最小堆结构的优先队列,并且每次出队时能够保证取出的任务是当前队列中下次执行时间最小的任务。同时注意一下优先队列中堆的顺序,堆中的顺序并不是绝对的,但要保证子节点的值要比父节点的值要大,这样就不会影响出队的顺序。
总体来说,ScheduedThreadPoolExecutor的重点是要理解下次执行时间的计算,以及优先队列的出队、入队和删除的过程,这两个是理解ScheduedThreadPoolExecutor的关键。
PS:以上代码提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git