文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

阿里架构师教你JUC-Future与FutureTask原理详解

2024-12-03 18:12

关注

[[350087]]

1 Future

 

Future 表示一个任务的生命周期,是一个可取消的异步运算。提供了相应的方法来判断任务状态(完成或取消),以及获取任务的结果和取消任务等。适合具有可取消性和执行时间较长的异步任务。

并发包中许多异步任务类都继承自Future,其中最典型的就是 FutureTask

1.1 介绍

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用get方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future 形式类型、并返回 null 作为底层任务的结果。

也就是说Future具有这样的特性

 

2 FutureTask

 

FutureTask 为 Future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。

FutureTask 常用来封装 Callable 和 Runnable,也可作为一个任务提交到线程池中执行。除了作为一个独立的类,此类也提供创建自定义 task 类使用。FutureTask 的线程安全由CAS保证。

 

FutureTask 内部维护了一个由volatile修饰的int型变量—state,代表当前任务的运行状态

 

在这七种状态中,有四种任务终止状态:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各种状态的转化如下:

数据结构及核心参数

  1. //内部持有的callable任务,运行完毕后置空 
  2. private Callable callable; 
  3.  
  4. //从get()中返回的结果或抛出的异常 
  5. private Object outcome; // non-volatile, protected by state reads/writes 
  6.  
  7. //运行callable的线程,在 run 时进行 CAS 操作 
  8. private volatile Thread runner; 
  9.  
  10. //使用Treiber栈保存等待线程 
  11. private volatile WaitNode waiters; 

FutureTask 继承了Runnale和Future,本身也作为一个线程运行,可以提交给线程池执行。维护了一个内部类WaitNode,使用简单的Treiber栈(无锁并发栈)实现,用于存储等待线程。FutureTask 只有一个自定义的同步器 Sync 的属性,所有的方法都是委派给此同步器来实现。这也是JUC里使用AQS的通用模式。

源码解析

FutureTask 的同步器 由于Future在任务完成后,可以多次自由获取结果,因此,用于控制同步的AQS使用共享模式。

 

FutureTask 底层任务的执行状态保存在AQS的状态里。AQS是否允许线程获取(是否阻塞)是取决于任务是否执行完成,而不是具体的状态值。

  1. private final class Sync extends AbstractQueuedSynchronizer { 
  2.     // 定义表示任务执行状态的常量。由于使用了位运算进行判断,所以状态值分别是2的幂。 
  3.  
  4.     // 表示任务已经准备好了,可以执行 
  5.     private static final int READY     = 0; 
  6.  
  7.     // 表示任务正在执行中 
  8.     private static final int RUNNING   = 1; 
  9.  
  10.     // 表示任务已执行完成 
  11.     private static final int RAN       = 2; 
  12.  
  13.     // 表示任务已取消 
  14.     private static final int CANCELLED = 4; 
  15.  
  16.  
  17.     // 底层的表示任务的可执行对象 
  18.     private final Callable callable; 
  19.  
  20.     // 表示任务执行结果,用于get方法返回。 
  21.     private V result; 
  22.  
  23.     // 表示任务执行中的异常,用于get方法调用时抛出。 
  24.     private Throwable exception; 
  25.  
  26.       
  27.     private volatile Thread runner; 
  28.  
  29.  
  30.       
  31.     protected int tryAcquireShared( int ignore) { 
  32.         return innerIsDone() ? 1 : -1; 
  33.     } 
  34.  
  35.      
  36.     protected boolean tryReleaseShared( int ignore) { 
  37.         runner = null
  38.         return true
  39.     } 
  40.  
  41.  
  42.      // 执行任务的方法 
  43.     void innerRun() { 
  44.         // 用于确保任务不会重复执行 
  45.         if (!compareAndSetState(READY, RUNNING)) 
  46.             return
  47.  
  48.         // 由于Future一般是异步执行,所以runner一般是线程池里的线程。 
  49.         runner = Thread.currentThread(); 
  50.  
  51.         // 设置执行线程后再次检查,在执行前检查是否被异步取消 
  52.         // 由于前面的CAS已把状态设置RUNNING, 
  53.         if (getState() == RUNNING) { // recheck after setting thread 
  54.             V result; 
  55.             // 
  56.             try { 
  57.                 result = callable.call(); 
  58.             } catch (Throwable ex) { 
  59.                 // 捕获任务执行过程中抛出的所有异常 
  60.                 setException(ex); 
  61.                 return
  62.             } 
  63.             set(result); 
  64.         } else { 
  65.       // 释放等待的线程 
  66.             releaseShared(0); // cancel 
  67.         } 
  68.     } 
  69.  
  70.     // 设置结果 
  71.     void innerSet(V v) { 
  72.         // 放在循环里进行是为了失败后重试。 
  73.         for (;;) { 
  74.             // AQS初始化时,状态值默认是 0,对应这里也就是 READY 状态。 
  75.             int s = getState(); 
  76.  
  77.             // 已完成任务不能设置结果 
  78.             if (s == RAN) 
  79.                 return
  80.  
  81.             // 已取消 的任务不能设置结果 
  82.             if (s == CANCELLED) { 
  83.                 // releaseShared 会设置runner为空, 
  84.                 // 这是考虑到与其他的取消请求线程 竞争中断 runner 
  85.                 releaseShared(0); 
  86.                 return
  87.             } 
  88.  
  89.             // 先设置已完成,免得多次设置 
  90.             if (compareAndSetState(s, RAN)) { 
  91.                 result = v; 
  92.                 releaseShared(0); // 此方法会更新 runner,保证result的可见性 
  93.                 done(); 
  94.                 return
  95.             } 
  96.         } 
  97.     } 
  98.  
  99.     // 获取异步计算的结果 
  100.     V innerGet() throws InterruptedException, ExecutionException { 
  101.         acquireSharedInterruptibly(0);// 获取共享,如果没有完成则会阻塞。 
  102.  
  103.         // 检查是否被取消 
  104.         if (getState() == CANCELLED) 
  105.             throw new CancellationException(); 
  106.  
  107.         // 异步计算过程中出现异常 
  108.         if (exception != null
  109.             throw new ExecutionException(exception); 
  110.  
  111.         return result; 
  112.     } 
  113.  
  114.     // 取消执行任务 
  115.     boolean innerCancel( boolean mayInterruptIfRunning) { 
  116.         for (;;) { 
  117.             int s = getState(); 
  118.  
  119.             // 已完成或已取消的任务不能再次取消 
  120.             if (ranOrCancelled(s)) 
  121.                 return false
  122.  
  123.             // 任务处于 READY 或 RUNNING 
  124.             if (compareAndSetState(s, CANCELLED)) 
  125.                 break; 
  126.         } 
  127.         // 任务取消后,中断执行线程 
  128.         if (mayInterruptIfRunning) { 
  129.             Thread r = runner; 
  130.             if (r != null
  131.                 r.interrupt(); 
  132.         } 
  133.         releaseShared(0); // 释放等待的访问结果的线程 
  134.         done(); 
  135.         return true
  136.     } 
  137.  
  138.      
  139.     private boolean ranOrCancelled( int state) { 
  140.         return (state & (RAN | CANCELLED)) != 0; 
  141.     } 
  142.  
  143.      // 其他方法省略 

从 innerCancel 方法可知,取消操作只是改变了任务对象的状态并可能会中断执行线程。如果任务的逻辑代码没有响应中断,则会一直异步执行直到完成,只是最终的执行结果不会被通过get方法返回,计算资源的开销仍然是存在的。

总的来说,Future 是线程间协调的一种工具。

AbstractExecutorService.submit(Callable task)

 

FutureTask 内部实现方法都很简单,先从线程池的submit分析。submit方法默认实现在AbstractExecutorService,几种实现源码如下:

  1. public Future submit(Runnable task) { 
  2.     if (task == null) throw new NullPointerException(); 
  3.     RunnableFuture ftask = newTaskFor(task, null); 
  4.     execute(ftask); 
  5.     return ftask; 
  6. public  Future submit(Runnable task, T result) { 
  7.     if (task == null) throw new NullPointerException(); 
  8.     RunnableFuture ftask = newTaskFor(task, result); 
  9.     execute(ftask); 
  10.     return ftask; 
  11. public  Future submit(Callable task) { 
  12.     if (task == null) throw new NullPointerException(); 
  13.     RunnableFuture ftask = newTaskFor(task); 
  14.     execute(ftask); 
  15.     return ftask; 
  16. protected  RunnableFuture newTaskFor(Runnable runnable, T value) { 
  17.     return new FutureTask(runnable, value); 
  18. public FutureTask(Runnable runnable, V result) { 
  19.     this.callable = Executors.callable(runnable, result); 
  20.     this.state = NEW;       // ensure visibility of callable 

首先调用newTaskFor方法构造FutureTask,然后调用execute把任务放进线程池中,返回FutureTask

FutureTask.run()

  1. public void run() { 
  2.     //新建任务,CAS替换runner为当前线程 
  3.     if (state != NEW || 
  4.         !UNSAFE.compareAndSwapObject(this, runnerOffset, 
  5.                                      null, Thread.currentThread())) 
  6.         return
  7.     try { 
  8.         Callable c = callable; 
  9.         if (c != null && state == NEW) { 
  10.             V result; 
  11.             boolean ran; 
  12.             try { 
  13.                 result = c.call(); 
  14.                 ran = true
  15.             } catch (Throwable ex) { 
  16.                 result = null
  17.                 ran = false
  18.                 setException(ex); 
  19.             } 
  20.             if (ran) 
  21.                 set(result);//设置执行结果 
  22.         } 
  23.     } finally { 
  24.         // runner must be non-null until state is settled to 
  25.         // prevent concurrent calls to run() 
  26.         runner = null
  27.         // state must be re-read after nulling runner to prevent 
  28.         // leaked interrupts 
  29.         int s = state; 
  30.         if (s >= INTERRUPTING) 
  31.             handlePossibleCancellationInterrupt(s);//处理中断逻辑 
  32.     } 

运行任务,如果任务状态为NEW状态,则利用CAS修改为当前线程。执行完毕调用set(result)方法设置执行结果。 set(result)源码如下

首先利用cas修改state状态为

设置返回结果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式设置state状态为

结果设置完毕后,调用finishCompletion()唤醒等待线程

  1. private void finishCompletion() { 
  2.     for (WaitNode q; (q = waiters) != null;) { 
  3.         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待线程 
  4.             for (;;) {//自旋遍历等待线程 
  5.                 Thread t = q.thread; 
  6.                 if (t != null) { 
  7.                     q.thread = null
  8.                     LockSupport.unpark(t);//唤醒等待线程 
  9.                 } 
  10.                 WaitNode next = q.next
  11.                 if (next == null
  12.                     break; 
  13.                 q.next = null; // unlink to help gc 
  14.                 q = next
  15.             } 
  16.             break; 
  17.         } 
  18.     } 
  19.     //任务完成后调用函数,自定义扩展 
  20.     done(); 
  21.     callable = null;        // to reduce footprint 

回到run方法,如果在 run 期间被中断,此时需要调用handlePossibleCancellationInterrupt处理中断逻辑,确保任何中断(例如cancel(true))只停留在当前run或runAndReset的任务中

  1. private void handlePossibleCancellationInterrupt(int s) { 
  2.     //在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待 
  3.     if (s == INTERRUPTING) 
  4.         while (state == INTERRUPTING) 
  5.             Thread.yield(); // wait out pending interrupt 

FutureTask.runAndReset()

runAndReset是 FutureTask另外一个任务执行的方法,它不会返回执行结果,而且在任务执行完之后会重置stat的状态为NEW,使任务可以多次执行。 runAndReset的典型应用是在 ScheduledThreadPoolExecutor 中,周期性的执行任务。

 

FutureTask.get()

FutureTask 通过get()获取任务执行结果。如果任务处于未完成的状态(state <= COMPLETING),就调用awaitDone等待任务完成。任务完成后,通过report获取执行结果或抛出执行期间的异常。

awaitDone(boolean timed, long nanos)

  1. private int awaitDone(boolean timed, long nanos) 
  2.     throws InterruptedException { 
  3.     final long deadline = timed ? System.nanoTime() + nanos : 0L; 
  4.     WaitNode q = null
  5.     boolean queued = false
  6.     for (;;) {//自旋 
  7.         if (Thread.interrupted()) {//获取并清除中断状态 
  8.             removeWaiter(q);//移除等待WaitNode 
  9.             throw new InterruptedException(); 
  10.         } 
  11.  
  12.         int s = state; 
  13.         if (s > COMPLETING) { 
  14.             if (q != null
  15.                 q.thread = null;//置空等待节点的线程 
  16.             return s; 
  17.         } 
  18.         else if (s == COMPLETING) // cannot time out yet 
  19.             Thread.yield(); 
  20.         else if (q == null
  21.             q = new WaitNode(); 
  22.         else if (!queued) 
  23.             //CAS修改waiter 
  24.             queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 
  25.                                                  q.next = waiters, q); 
  26.         else if (timed) { 
  27.             nanos = deadline - System.nanoTime(); 
  28.             if (nanos <= 0L) { 
  29.                 removeWaiter(q);//超时,移除等待节点 
  30.                 return state; 
  31.             } 
  32.             LockSupport.parkNanos(this, nanos);//阻塞当前线程 
  33.         } 
  34.         else 
  35.             LockSupport.park(this);//阻塞当前线程 
  36.     } 

awaitDone用于等待任务完成,或任务因为中断或超时而终止。返回任务的完成状态。

如果线程被中断,首先清除中断状态,调用removeWaiter移除等待节点,然后抛InterruptedException。removeWaiter源码如下:

  1. private void removeWaiter(WaitNode node) { 
  2.     if (node != null) { 
  3.         node.thread = null;//首先置空线程 
  4.         retry: 
  5.         for (;;) {          // restart on removeWaiter race 
  6.             //依次遍历查找 
  7.             for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 
  8.                 s = q.next
  9.                 if (q.thread != null
  10.                     pred = q; 
  11.                 else if (pred != null) { 
  12.                     pred.next = s; 
  13.                     if (pred.thread == null) // check for race 
  14.                         continue retry; 
  15.                 } 
  16.                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替换 
  17.                     continue retry; 
  18.             } 
  19.             break; 
  20.         } 
  21.     } 

如果当前为结束态(state>COMPLETING),则根据需要置空等待节点的线程,并返回 Future 状态

如果当前为正在完成(COMPLETING),说明此时 Future 还不能做出超时动作,为任务让出CPU执行时间片

如果state为NEW,先新建一个WaitNode,然后CAS修改当前waiters

如果等待超时,则调用removeWaiter移除等待节点,返回任务状态;如果设置了超时时间但是尚未超时,则park阻塞当前线程

其他情况直接阻塞当前线程

 

FutureTask.cancel(boolean mayInterruptIfRunning)

  1. public boolean cancel(boolean mayInterruptIfRunning) { 
  2.     //如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED 
  3.     if (!(state == NEW && 
  4.           UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 
  5.               mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 
  6.         return false
  7.     try {    // in case call to interrupt throws exception 
  8.         if (mayInterruptIfRunning) {//可以在运行时中断 
  9.             try { 
  10.                 Thread t = runner; 
  11.                 if (t != null
  12.                     t.interrupt(); 
  13.             } finally { // final state 
  14.                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 
  15.             } 
  16.         } 
  17.     } finally { 
  18.         finishCompletion();//移除并唤醒所有等待线程 
  19.     } 
  20.     return true

说明:尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED。如果当前状态不为NEW,则根据参数mayInterruptIfRunning决定是否在任务运行中也可以中断。中断操作完成后,调用finishCompletion移除并唤醒所有等待线程。

 

示例

小结
本章重点:FutureTask 结果返回机制,以及内部运行状态的转变

 

 

来源:JavaEdge内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯