前言
本篇博客重点讲解ThreadPoolExecutor的三个基础设施类AbstractExecutorService、FutureTask和ExecutorCompletionService的实现细节,AbstractExecutorService实现了ExecutorService的大部分接口,子类只需实现excute方法和shutdown相关方法即可;FutureTask是RunnableFuture接口的主要实现,该接口是Runnable和Future的包装类接口,会执行Runnable对应的run方法,调用方可以通过Future接口获取任务的执行状态和结果;ExecutorCompletionService是帮助获取多个RunnableFuture任务的执行结果的工具类,基于FutureTask执行完成时的回调方法done实现的。
一、AbstractExecutorService
1、定义
ThreadPoolExecutor的类继承关系如下:
其中ExecutorService的子类如下:
右上角带S的表示内部类,我们重点关注ThreadPoolExecutor,ScheduledThreadPoolExecutor和ForkJoinPool三个类的实现,后面两个类会在后面的博客中逐一探讨。
Executor包含的方法如下:
ExecutorService包含的方法如下:
上述接口方法中涉及的Callable接口的定义如下:
该接口也是表示一个执行任务,跟常见的Runnable接口的区别在于call方法有返回值而run方法没有返回值。
Future表示某个任务的执行结果,其定义的方法如下:
其子类比较多,如下:
后面会将涉及的子类逐一探讨的。 AbstractExecutorService基于Executor接口的excute方法实现了大部分的ExecutorService的接口,子类只需要重点实现excute方法和shutdown相关方法即可,下面来分析其具体的实现。
2、submit
//Runnable接口方法没有返回值,但是可以通过Future判断任务是否执行完成
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
//因为Runnable的run方法没有返回值,所以如果run方法正常执行完成,其结果就是result
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
//都是返回FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
3、invokeAll
//执行完成tasks中所有的任务,如果有一个抛出异常,则取消掉剩余的任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
//遍历tasks中的任务将其转换成RunnableFuture,然后提交到线程池执行
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
//遍历Future列表
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) { //如果未执行完成
try {
//等待任务执行完成
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
//所有任务都执行完了
done = true;
return futures;
} finally {
if (!done)
//出现异常,将所有的任务都取消掉
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
//逻辑同上,不过加了等待时间限制,所有的任务的累计时间不能超过指定值,如果超时直接返回Future列表
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
//转换成纳秒
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
//转换成Future
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
//计算终止时间
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime(); //计算剩余时间
if (nanos <= 0L) //如果超时了则直接返回
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) { //任务未执行
if (nanos <= 0L)
return futures; //等待超时
try {
//等待任务执行完成
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done) //出现异常,取消掉剩余未执行的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
4、invokeAny
//多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
//多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉
//如果指定时间内没有执行成功的,则抛出TimeoutException 异常
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
//参数校验
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
//提交一个任务
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
//获取最新的已完成任务
Future<T> f = ecs.poll();
if (f == null) {
//没有执行完的
if (ntasks > 0) {
--ntasks;
//继续添加下一个任务
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0) //所有任务都执行失败了,没有执行成功的
break;
else if (timed) { //等待超时
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
//计算剩余等待时间
nanos = deadline - System.nanoTime();
}
else
//所有任务都提交了,阻塞等待某个任务执行完成
f = ecs.take();
}
if (f != null) {
--active;
try {
//某个任务已执行完成,如果抛出异常则执行下一个任务
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
} //for循环终止
//所有任务都执行失败了
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//返回前,将未执行完成的任务都取消掉
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
二、FutureTask
1、定义
FutureTask的类继承关系如下:
RunnableFuture接口没有新增方法,将Runnable的run方法由public改成包级访问了,如下:
该类包含的实例属性如下:
private Callable<V> callable;
private Object outcome; // non-volatile, protected by state reads/writes
private volatile Thread runner;
private volatile WaitNode waiters;
//状态
private volatile int state;
其中WaitNode是一个简单的内部类,其定义如下:
该类包含的静态属性都是字段偏移量,通过static代码块初始化,如下:
FutureTask定义了多个表示状态的常量,如下:
//初始状态
private static final int NEW = 0;
//是一个很短暂的中间状态,表示任务已执行完成,保存完执行结果后就流转成NORMAL或者EXCEPTIONAL
private static final int COMPLETING = 1;
//正常执行完成
private static final int NORMAL = 2;
//异常终止
private static final int EXCEPTIONAL = 3;
//任务被取消了
private static final int CANCELLED = 4;
//是一个很短暂的中间状态,调用interrupt方法后,会将状态流转成INTERRUPTED
private static final int INTERRUPTING = 5;
//任务执行已中断
private static final int INTERRUPTED = 6;
可能的状态流转如下图:
2、构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; //初始状态是NEW
}
public FutureTask(Runnable runnable, V result) {
//将Runnable适配成Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
//Executors方法
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
其中RunnableAdapter是Executors的一个静态内部类,其实现如下:
3、get
get方法用于阻塞当前线程直到任务执行完成,如果阻塞的过程中被中断则抛出异常InterruptedException,可以限制阻塞的时间,如果等待超时还是未完成则抛出异常TimeoutException。
//阻塞当前线程等待任务执行完成
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) //如果未完成
s = awaitDone(false, 0L);
return report(s);
}
//同上,可以限制等待的时间
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
//阻塞当前线程,如果返回值还是未完成说明是等待超时了,则抛出异常
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
//timed为true表示等待指定的时间,否则是无期限等待
//该方法返回退出此方法时的状态
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//计算等待的终止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { //如果当前线程被中断了,则从等待链表中移除,并抛出异常
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { //如果任务已执行完
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) //正在状态流转的过程中,让出当前CPU时间片
Thread.yield();
//未开始执行
else if (q == null)
q = new WaitNode();
else if (!queued)
//修改waiters属性,插入到链表头
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//已插入到链表中
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) { //等待超时,从链表中移除
removeWaiter(q);
return state;
}
//让当前线程休眠
LockSupport.parkNanos(this, nanos);
}
else
//让当前线程休眠
LockSupport.park(this);
}
}
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;//将thread置为null
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
//q.thread为null,需要被移除
else if (pred != null) {
pred.next = s; //将q从链表移除
if (pred.thread == null) //如果为null,则从头开始遍历
continue retry;
}
//q.thread为null,pred为null,之前没有有效节点,修改waiters,修改失败重试
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
//awaitDone正常返回后调用此方法,此时状态应该是COMPLETING之后了
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL) //如果是正常结束
return (V)x;
if (s >= CANCELLED) //如果被取消了
throw new CancellationException();
throw new ExecutionException((Throwable)x); //如果出现异常了
}
4、run / runAndReset
run方法是有线程池调用的,会执行Callable任务,保存执行的结果,如果出现异常则保存异常对象,并完成状态流转,最后将等待任务完成的阻塞中的线程唤醒。runAndReset和run类似,区别在于runAndReset正常执行完成后不会保存执行的结果,不会改变状态,状态还是NEW,如果是正常执行则返回true,该方法是子类使用的,其调用链如下:
这两方法的实现如下:
//由线程池中的某个线程调用此方法
public void run() {
//如果不等于NEW,说明其他某个线程正在执行任务
//如果等于NEW,则cas修改runner属性,修改失败说明其他某个线程也准备执行这个任务
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//cas成功表示这个任务由当前线程抢占成功
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
//出现异常
result = null;
ran = false;
setException(ex); //保存异常对象
}
if (ran)
//执行成功保存结果
set(result);
}
} finally {
//如果任务被cancel了,则上述setException和set方法因为状态不是NEW了会直接返回
runner = null;
int s = state;
if (s >= INTERRUPTING) //如果被中断,自旋等待中断完成
handlePossibleCancellationInterrupt(s);
}
}
//跟run方法相比区别就是正常执行完成不会保存结果,不会流转状态
protected boolean runAndReset() {
//如果state不是NEW或者cas修改runner失败
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
//执行任务,但是不保存结果,状态就不会从NEW流转成NORMAL
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex); //保存异常实例
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING) //任务被中断了,自旋等待中断完成
handlePossibleCancellationInterrupt(s);
}
//返回任务是否正常完成
return ran && s == NEW;
}
//保存异常对象并修改状态
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//只有原来的状态是NEW才进入下面的逻辑
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//任务执行完成,唤醒阻塞的线程
finishCompletion();
}
}
//保存执行结果并修改状态
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//只有原来的状态是NEW才进入下面的逻辑
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//任务执行完成,唤醒阻塞的线程
finishCompletion();
}
}
private void handlePossibleCancellationInterrupt(int s) {
//正在中断的过程中
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); //自旋等待中断完成
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//cas将waiters置为null
for (;;) {
Thread t = q.thread;
if (t != null) {
//唤醒阻塞的新线程
q.thread = null;
LockSupport.unpark(t);
}
//遍历下一个节点
WaitNode next = q.next;
if (next == null) //遍历结束,终止循环
break;
q.next = null; // unlink to help gc
q = next;
}
break; //终止外层循环
}
}
//执行完成的回调方法,默认是空实现,子类可改写此方法
done();
callable = null; // to reduce footprint
}
5、 cancel
cancel方法的参数为true,则会将当前状态由NEW改成INTERRUPTING,如果此任务已经开始执行了,则将正在执行任务的线程标记为已中断,如果该线程响应中断则可能抛出异常,如果不响应中断则继续执行,最后再将状态改成INTERRUPTED;如果方法的参数为false,则将当前状态由NEW改成CANCELLED,如果此任务已经开始执行了则会继续执行。上述两种情形下,状态流转完成后都会唤醒还在阻塞中的等待线程,如果任务已经开始执行并且继续执行,因为状态已经不是NEW了,直接结果不会保存下来。
//如果mayInterruptIfRunning为true,则会将正在执行任务的线程标记为已中断,线程有可能继续执行,也有可能响应中断抛出异常
//如果为false,则标记为CANCELLED,如果任务已经开始执行了则会继续执行
//如果未执行,则标记为CANCELLED或者INTERRUPTING都会让这任务不会被执行了
public boolean cancel(boolean mayInterruptIfRunning) {
//如果state不是NEW 或者cas修改失败,则返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt(); //将正在执行任务的线程标记为已中断
} finally {
//修改状态为已中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒等待的线程
finishCompletion();
}
return true;
}
三、ExecutorCompletionService
1、定义
ExecutorCompletionService是一个帮助获取多个Future执行结果的工具类,其类继承关系如下:
CompletionService包含的方法如下:
后面会讲解各方法的用途,该类包含的属性如下:
//执行任务的线程池实现
private final Executor executor;
//调用其newTaskFor方法
private final AbstractExecutorService aes;
//已执行完成的Future阻塞队列
private final BlockingQueue<Future<V>> completionQueue;
其构造方法实现如下:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
//如果executor继承自AbstractExecutorService,则aes为executor,否则为null
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
//没有指定队列,默认使用基于链表的无固定容量的LinkedBlockingQueue
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
2、submit
submit方法将Callable或者Runnable任务包装成一个RunnableFuture,然后提交到线程池中,返回RunnableFuture实例。
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
//将其包装成RunnableFuture实现类
RunnableFuture<V> f = newTaskFor(task);
//提交任务到线程池
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task); //默认使用FutureTask作为RunnableFuture的实现
else
return aes.newTaskFor(task);//如果aes不为null,则使用该类的特定实现
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
其中QueueingFuture是一个内部类,继承自FutureTask,其实现如下:
重点改写了done方法的实现,如果任务已经执行完成,则会将该Future实例添加到阻塞队列中。
3、take / poll
这三方法就是从已完成的Future阻塞队列中获取并移除Future实例,如果队列为空,take方法会无期限阻塞阻塞,不带时间参数的poll方法不会阻塞返回null,带时间参数的poll方法会阻塞指定的时间,如果超时则返回null,其实现都是直接调用阻塞队列的方法,如下:
总结
到此这篇关于Java8中AbstractExecutorService与FutureTask源码详解的文章就介绍到这了,更多相关Java8 AbstractExecutorService与FutureTask内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!