文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

怎么使用Java多线程Future获取异步任务

2023-07-05 23:49

关注

本篇内容主要讲解“怎么使用Java多线程Future获取异步任务”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么使用Java多线程Future获取异步任务”吧!

Runnable的局限性

在前文中我们谈到,通过编码实现Runnable接口,将获得具有边界性的 "任务",在指定的线程(或者线程池)中运行。

重新观察该接口,不难发现它并没有方法返回值:

public interface Runnable {    void run();}

在JDK1.5之前,想利用任务的执行结果,需要小心的操作线程访问临界区资源。使用 回调 进行解耦是非常不错的选择。

练手小Demo -- 回顾既往文章知识

注意,为了减少篇幅使用了lambda,但jdk1.5之前并不支持lambda

将计算任务分离到其他线程执行,再回到主线程消费结果

我们将计算、IO等耗时任务丢到其他线程,让主线程专注于自身业务,假想它在接受用户输入以及处理反馈,但我们略去这一部分

我们可以设计出类似下面的代码:

虽然它还有很多不合理之处值得优化,但也足以用于演示

class Demo {    static final Object queueLock = new Object();    static List<Runnable> mainQueue = new ArrayList<>();    static boolean running = true;    static final Runnable FINISH = () -> running = false;    public static void main(String[] args) {        synchronized (queueLock) {            mainQueue.add(Demo::onStart);        }        while (running) {            Runnable runnable = null;            synchronized (queueLock) {                if (!mainQueue.isEmpty())                    runnable = mainQueue.remove(0);            }            if (runnable != null) {                runnable.run();            }            Thread.yield();        }    }    public static void onStart() {        //...    }    public static void finish() {        synchronized (queueLock) {            mainQueue.clear();            mainQueue.add(FINISH);        }    }}

再模拟一个计算的线程和任务回调:

interface Callback {    void onResultCalculated(int result);}class CalcThread extends Thread {    private final Callback callback;    private final int a;    private final int b;    public CalcThread(Callback callback, int a, int b) {        this.callback = callback;        this.a = a;        this.b = b;    }    @Override    public void run() {        super.run();        try {            Thread.sleep(10);        } catch (InterruptedException e) {            e.printStackTrace();        }        final int result = a + b;        System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());        synchronized (queueLock) {            mainQueue.add(() -> callback.onResultCalculated(result));        }    }}

填充一下onStart业务:

class Demo {    public static void onStart() {        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());        new CalcThread(result -> {            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());            finish();        }, 200, 300).start();    }}

复习:优化为使用Runnable

在前文我们提到,如果业务仅关注任务的执行,并不过于关心线程本身,则可以利用Runnable:

class Demo {    static class CalcRunnable implements Runnable {        private final Callback callback;        private final int a;        private final int b;        public CalcRunnable(Callback callback, int a, int b) {            this.callback = callback;            this.a = a;            this.b = b;        }        @Override        public void run() {            try {                Thread.sleep(10);            } catch (InterruptedException e) {                e.printStackTrace();            }            final int result = a + b;            System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());            synchronized (queueLock) {                mainQueue.add(() -> callback.onResultCalculated(result));            }        }    }    public static void onStart() {        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());        new Thread(new CalcRunnable(result -> {            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());            finish();        }, 200, 300)).start();    }}

不难想象出:我们非常需要

至此,我们可以体会到:JDK1.5之前,因为JDK的功能不足,Java程序对于线程的使用 较为粗糙

为异步而生的Future

终于在JDK1.5中,迎来了新特性: Future 以及先前文章中提到的线程池, 时光荏苒,一晃将近20年了。

public interface Future<V> {    boolean cancel(boolean mayInterruptIfRunning);    boolean isCancelled();    boolean isDone();    V get() throws InterruptedException, ExecutionException;    V get(long timeout, TimeUnit unit)            throws InterruptedException, ExecutionException, TimeoutException;}

尽管已经移除了API注释,但仍然能够理解每个API的含义,不多做赘述。

显而易见,为了增加返回值,没有必要用如此复杂的 接口来替代 Runnable。简单思考后可以对返回值的情况进行归纳:

从业务层上看,仅需要如下接口即可,它增加了返回值、并可以更友好地让使用者处理异常:

作者按:抛开底层实现,仅看业务方编码需要

public interface Callable<V> {        V call() throws Exception;}

显然,JDK需要提供后向兼容能力:

所以一并提供了适配器,让使用者进行简单的局部重构即可用上新特性

static final class RunnableAdapter<T> implements Callable<T> {    final Runnable task;    final T result;    RunnableAdapter(Runnable task, T result) {        this.task = task;        this.result = result;    }    public T call() {        task.run();        return result;    }}

而Future恰如其名,它代表了在 "未来" 的一个结果和状态,为了更方便地处理异步而生。

并且内置了 FutureTask,在 FutureTask详解 章节中再行展开。

类图

在JDK1.8的基础上,看一下精简的类图结构:

怎么使用Java多线程Future获取异步任务

FutureTask详解

构造函数

public class FutureTask {    public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        this.callable = callable;        this.state = NEW;       // ensure visibility of callable    }    public FutureTask(Runnable runnable, V result) {        this.callable = Executors.callable(runnable, result);        this.state = NEW;       // ensure visibility of callable    }}

生命周期

public class FutureTask {    //新建    private static final int NEW = 0;    //处理中    private static final int COMPLETING = 1;    //正常    private static final int NORMAL = 2;    //异常    private static final int EXCEPTIONAL = 3;    //已取消    private static final int CANCELLED = 4;    //中断中    private static final int INTERRUPTING = 5;    //已中断    private static final int INTERRUPTED = 6;}

可能的生命周期转换如下:

JDK中原汁原味的解释如下:

The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified.

核心方法

本节从以下三块入手阅读源码

状态判断API的实现非常简单

public class FutureTask {    public boolean isCancelled() {        return state >= CANCELLED;    }    public boolean isDone() {        return state != NEW;    }}

取消:

public class FutureTask {    public boolean cancel(boolean mayInterruptIfRunning) {        if (!(state == NEW &amp;&amp;                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) {            return false;        }        try {    // in case call to interrupt throws exception            if (mayInterruptIfRunning) {                try {                    Thread t = runner;                    if (t != null)                        t.interrupt();                } finally { // final state                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);                }            }        } finally {            finishCompletion();        }        return true;    }    private void finishCompletion() {        // assert state &gt; COMPLETING;        for (WaitNode q; (q = waiters) != null; ) {            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                for (; ; ) {                    Thread t = q.thread;                    if (t != null) {                        q.thread = null;                        LockSupport.unpark(t);                    }                    WaitNode next = q.next;                    if (next == null)                        break;                    q.next = null; // unlink to help gc                    q = next;                }                break;            }        }        done();        callable = null;        // to reduce footprint    }}

获取结果: 先判断状态,如果未进入到 COMPLETING(即为NEW状态),则阻塞等待状态改变,返回结果或抛出异常

public class FutureTask {    public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s &lt;= COMPLETING)            s = awaitDone(false, 0L);        return report(s);    }    public V get(long timeout, TimeUnit unit)            throws InterruptedException, ExecutionException, TimeoutException {        if (unit == null)            throw new NullPointerException();        int s = state;        if (s &lt;= COMPLETING &amp;&amp;                (s = awaitDone(true, unit.toNanos(timeout))) &lt;= COMPLETING)            throw new TimeoutException();        return report(s);    }    private V report(int s) throws ExecutionException {        Object x = outcome;        if (s == NORMAL)            return (V) x;        if (s &gt;= CANCELLED)            throw new CancellationException();        throw new ExecutionException((Throwable) x);    }}

如何使用

而使用则非常简单,也非常的朴素。

我们以文中的的例子进行改造:

class Demo {   static class CalcResult {      public int result;   }   public static void onStart() {      System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());      final CalcResult calcResult = new CalcResult();      Future&lt;CalcResult&gt; resultFuture = Executors.newSingleThreadExecutor().submit(() -&gt; {         try {            Thread.sleep(10);         } catch (InterruptedException e) {            e.printStackTrace();         }         final int result = 200 + 300;         System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());         calcResult.result = result;      }, calcResult);      System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis());      if (resultFuture.isDone()) {         try {            final int ret = resultFuture.get().result;            System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());         } catch (InterruptedException | ExecutionException e) {            e.printStackTrace();         }      }      finish();   }}

如果直接使用新特性Callback,则如下:

直接返回结果,当然也可以直接返回Integer,不再包裹一层

class Demo {   public static void onStart() {      System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());      ExecutorService executor = Executors.newSingleThreadExecutor();      Future<CalcResult> resultFuture = executor.submit(() -> {         try {            Thread.sleep(10);         } catch (InterruptedException e) {            e.printStackTrace();         }         final int result = 200 + 300;         System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());         final CalcResult calcResult = new CalcResult();         calcResult.result = result;         return calcResult;      });      System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis());      if (resultFuture.isDone()) {         try {            final int ret = resultFuture.get().result;            System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());         } catch (InterruptedException | ExecutionException e) {            e.printStackTrace();         }      }      executor.shutdown();      finish();   }}

相信读者诸君会有这样的疑惑:

为何使用Future比原先的回调看起来粗糙?

首先要明确一点:文中前段的回调Demo,虽然达成了既定目标,但效率并不高!!在当时计算很昂贵的背景下,并不会如此莽撞地使用!

而在JDK1.5开始,提供了大量内容支持多线程开发。考虑到篇幅,会在系列文章中逐步展开。

另外,FutureTask中的CAS与Happens-Before本篇中亦不做展开。

接下来,再做一些引申,简单看一看多线程业务模式。

引申,多线程业务模式

常用的多线程设计模式包括:

Future模式

文中对于Future的使用方式遵循了Future模式。

业务方在使用时,已经明确了任务被分离到其他线程执行时有等待期,在此期间,可以干点别的事情,不必浪费系统资源。

Master-Worker模式

在程序系统中设计两类线程,并相互协作:

Master线程负责接受任务、分配任务、接收(必要时进一步组合)结果并返回;

Worker线程负责处理子任务,当子任务处理完成后,向Master线程返回结果;

作者按:此时可再次回想一下文章开头的Demo

Guarded Suspension模式

不变模式

在并行开发过程中,为确保数据的一致性和正确性,有必要对对象进行同步,而同步操作会对程序系统的性能产生相当的损耗。

因此,使用状态不可改变的对象,依靠其不变性来确保 并行操作没有同步机制 的情况下,保持一致性和正确性。

生产者-消费

设计两类线程:若干个生产者线程和若干个消费者线程。

生产者线程负责提交用户请求,消费者线程负责处理用户请求。生产者和消费者之间通过共享内存缓冲区进行通信。

内存缓冲区的意义:

这几种模式从不同角度出发解决特定问题,但亦有一定的相似之处,不再展开。

到此,相信大家对“怎么使用Java多线程Future获取异步任务”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯