文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java异步任务计算FutureTask源码分析

2023-06-30 11:13

关注

本文小编为大家详细介绍“Java异步任务计算FutureTask源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Java异步任务计算FutureTask源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

了解一下什么是FutureTask?

FutureTask 是一个可取消的异步计算。

FutureTask提供了对Future的基本实现,可以调用方法去开始和取消一个计算,可以查询计算是否完成,并且获取计算结果。

FutureTask只能在计算完成后获取到计算结果,一旦计算完成,将不能重启或者取消,除非调用runAndReset方法。

FutureTask除了实现了Future接口以外,还实现了Runnable接口,因此FutureTask是可以交由线程池的Executor执行,也可以直接使用一个异步线程调用执行(futureTask.run())。

FutureTask 是如何实现的呢?

首先,我们看一下FutureTask类的继承结构,如下图,它实现的是RunnableFuture接口,而RunnableFuture继承自Future和函数式接口Runnable,所以说FutureTask本质就是一个可运行的Future。

Java异步任务计算FutureTask源码分析

Future 接口约定了一些异步计算类必须要实现的功能,源码如下:

package java.util.concurrent;public interface Future<V> {        boolean cancel(boolean mayInterruptIfRunning);        boolean isCancelled();        boolean isDone();        V get() throws InterruptedException, ExecutionException;        V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException;}

Runnable 接口我们都很熟悉,他就是一个函数式接口,我们常用其创建一个线程。

package java.lang;?@FunctionalInterfacepublic interface Runnable { ? ? ? ?public abstract void run();}

FutureTask就是一个将要被执行的任务,它包含了以上接口具体的实现,FutureTask内部定义了任务的状态state和一些状态的常量,它的内部核心是一个Callable callable,我们通过构造函数可以传入callable或者是runnable,最后都会内部转为callable,因为我们需要获取异步任务的执行结果,只有通过Callable创建的线程才会返回结果。

我们可以通过此时的状态判断Future中isCancelled()isDone()的返回结果。

以下为FutureTask源码,内含核心源码分析注释

package java.util.concurrent;import java.util.concurrent.locks.LockSupport;public class FutureTask<V> implements RunnableFuture<V> {        private volatile int state;    private static final int NEW          = 0; // 新建    private static final int COMPLETING   = 1; // 完成    private static final int NORMAL       = 2; // 正常    private static final int EXCEPTIONAL  = 3; // 异常    private static final int CANCELLED    = 4; // 取消    private static final int INTERRUPTING = 5; // 中断中    private static final int INTERRUPTED  = 6; // 中断的    private Callable<V> callable;        private Object outcome;     private volatile Thread runner;    private volatile WaitNode waiters;    ...    public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        this.callable = callable;        this.state = NEW;           }    public FutureTask(Runnable runnable, V result) {        this.callable = Executors.callable(runnable, result);        this.state = NEW;           }    public boolean isCancelled() {        return state >= CANCELLED;    }    public boolean isDone() {        return state != NEW;    }        public boolean cancel(boolean mayInterruptIfRunning) {        if (!(state == NEW &&              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))            return false;        try {    // in case call to interrupt throws exception            if (mayInterruptIfRunning) {                try {                    Thread t = runner;                    if (t != null)                        t.interrupt();                } finally { // final state                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);                }            }        } finally {            finishCompletion();        }        return true;    }            public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s <= COMPLETING)            s = awaitDone(false, 0L);        return report(s);    }      public V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException {        if (unit == null)            throw new NullPointerException();        int s = state;        if (s <= COMPLETING &&            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)            throw new TimeoutException();        return report(s);    }        private V report(int s) throws ExecutionException {        Object x = outcome;        if (s == NORMAL)            return (V)x;        if (s >= CANCELLED)            throw new CancellationException();        throw new ExecutionException((Throwable)x);    }    protected void done() { }        protected void set(V v) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = v;            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state            finishCompletion();        }    }        protected void setException(Throwable t) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = t;            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state            finishCompletion();        }    }        public void run() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return;        try {            Callable<V> c = callable;            if (c != null && state == NEW) {                V result;                boolean ran;                try {                    result = c.call();                    ran = true;                } catch (Throwable ex) {                    result = null;                    ran = false;                    setException(ex);                }                if (ran)                    set(result);            }        } finally {            runner = null;            int s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }    }        protected boolean runAndReset() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return false;        boolean ran = false;        int s = state;        try {            Callable<V> c = callable;            if (c != null && s == NEW) {                try {                    c.call(); // don't set result                    ran = true;                } catch (Throwable ex) {                    setException(ex);                }            }        } finally {            runner = null;            s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }        return ran && s == NEW;    }        private void handlePossibleCancellationInterrupt(int s) {        if (s == INTERRUPTING)            while (state == INTERRUPTING)                Thread.yield();     }    static final class WaitNode {        volatile Thread thread;        volatile WaitNode next;        WaitNode() { thread = Thread.currentThread(); }    }        private void finishCompletion() {        // assert state > COMPLETING;        for (WaitNode q; (q = waiters) != null;) {            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                for (;;) {                    Thread t = q.thread;                    if (t != null) {                        q.thread = null;                        LockSupport.unpark(t);                    }                    WaitNode next = q.next;                    if (next == null)                        break;                    q.next = null; // unlink to help gc                    q = next;                }                break;            }        }        done();        callable = null;        // to reduce footprint    }        private int awaitDone(boolean timed, long nanos)        throws InterruptedException {        final long deadline = timed ? System.nanoTime() + nanos : 0L;        WaitNode q = null;        boolean queued = false;        for (;;) {            if (Thread.interrupted()) {                removeWaiter(q);                throw new InterruptedException();            }            int s = state;            if (s > COMPLETING) {                if (q != null)                    q.thread = null;                return s;            }            else if (s == COMPLETING) // cannot time out yet                Thread.yield();            else if (q == null)                q = new WaitNode();            else if (!queued)                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                     q.next = waiters, q);            else if (timed) {                nanos = deadline - System.nanoTime();                if (nanos <= 0L) {                    removeWaiter(q);                    return state;                }                LockSupport.parkNanos(this, nanos);            }            else                LockSupport.park(this);        }    }        private void removeWaiter(WaitNode node) {        if (node != null) {            node.thread = null;            retry:            for (;;) {          // restart on removeWaiter race                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {                    s = q.next;                    if (q.thread != null)                        pred = q;                    else if (pred != null) {                        pred.next = s;                        if (pred.thread == null) // check for race                            continue retry;                    }                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,                                                          q, s))                        continue retry;                }                break;            }        }    }    // Unsafe mechanics    private static final sun.misc.Unsafe UNSAFE;    private static final long stateOffset;    private static final long runnerOffset;    private static final long waitersOffset;    static {        try {            UNSAFE = sun.misc.Unsafe.getUnsafe();            Class<?> k = FutureTask.class;            stateOffset = UNSAFE.objectFieldOffset                (k.getDeclaredField("state"));            runnerOffset = UNSAFE.objectFieldOffset                (k.getDeclaredField("runner"));            waitersOffset = UNSAFE.objectFieldOffset                (k.getDeclaredField("waiters"));        } catch (Exception e) {            throw new Error(e);        }    }}

FutureTask 运行流程

一般来说,我们可以认为FutureTask具有以下三种状态:

未启动:新建的FutureTask,在run()没执行之前,FutureTask处于未启动状态。

private static final int NEW          = 0; // 新建

已启动FutureTask对象的run方法启动并执行的过程中,FutureTask处于已启动状态。

已完成:FutureTask正常执行结束,或者FutureTask执行被取消(FutureTask对象cancel方法),或者FutureTask对象run方法执行抛出异常而导致中断而结束,FutureTask都处于已完成状态。

private static final int COMPLETING   = 1; // 完成private static final int NORMAL       = 2; // 完成后正常设置结果private static final int EXCEPTIONAL  = 3; // 完成后异常设置异常private static final int CANCELLED    = 4; // 执行取消private static final int INTERRUPTING = 5; // 中断中private static final int INTERRUPTED  = 6; // 中断的

FutureTask 的使用

使用一(直接新建一个线程调用):

FutureTask<Integer> task = new FutureTask<>(new Callable() {@Override    public Integer call() throws Exception {    return sum();    }});new Thread(task).stat();Integer result = task.get();

使用二(结合线程池使用)

FutureTask<Integer> task = new FutureTask<>(new Callable() {@Override    public Integer call() throws Exception {    return sum();    }});Executors.newCachedThreadPool().submit(task);Integer result = task.get();

读到这里,这篇“Java异步任务计算FutureTask源码分析”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯