文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

获取双异步返回值时,如何保证主线程不阻塞?

2024-11-30 01:53

关注

一、前情提要

在上一篇文章中,使用双异步后,如何保证数据一致性?,通过Future获取异步返回值,轮询判断Future状态,如果执行完毕或已取消,则通过get()获取返回值,get()是阻塞的方法,因此会阻塞当前线程,如果通过new Runnable()执行get()方法,那么还是需要返回AsyncResult,然后再通过主线程去get()获取异步线程返回结果。

写法很繁琐,还会阻塞主线程。

下面是FutureTask异步执行流程图:

二、JDK8的CompletableFuture

1、ForkJoinPool

Java8中引入了CompletableFuture,它实现了对Future的全面升级,可以通过回调的方式,获取异步线程返回值。

CompletableFuture的异步执行通过ForkJoinPool实现, 它使用守护线程去执行任务。

ForkJoinPool在于可以充分利用多核CPU的优势,把一个任务拆分成多个小任务,把多个小任务放到多个CPU上并行执行,当多个小任务执行完毕后,再将其执行结果合并起来。

Future的异步执行是通过ThreadPoolExecutor实现的。

2、从ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的区别

因此,在多线程任务分配不均时,ForkJoinPool的执行效率更高。但是,如果任务分配均匀,ThreadPoolExecutor的执行效率更高,因为ForkJoinPool会创建大量子任务,并对其进行大量的GC,比较耗时。

三、通过CompletableFuture优化 “通过Future获取异步返回值”

1、通过Future获取异步返回值关键代码

(1)将异步方法的返回值改为Future,将返回值放到new AsyncResult<>();中

@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
     // 此代码为简化关键性代码
        List> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入数据异常:",e);
    }
}
@Async("async-executor")
public Future readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}

(2)通过Future.get()获取返回值

public static boolean getFutureResult(List> futureList, int excelRow) {
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("获取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在执行---获取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("获取Future返回值异常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);
    return insertFlag;
}

2、通过CompletableFuture获取异步返回值关键代码

(1)将异步方法的返回值改为 int

@Async("async-executor")
public void readXls(String filePath, String filename) {
 List> completableFutureList = new ArrayList<>();
    for (int time = 0; time < times; time++) {
     // 此代码为简化关键性代码
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
         @Override
         public Integer get() {
             return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
         }
     }).thenApply((result) -> {// 回调方法
         return thenApplyTest2(result);// supplyAsync返回值 * 1
     }).thenApply((result) -> {
         return thenApplyTest5(result);// thenApply返回值 * 1
     }).exceptionally((e) -> { // 如果执行异常:
         logger.error("CompletableFuture.supplyAsync----异常:", e);
         return null;
     });
 
     completableFutureList.add(completableFuture);
    }
}
@Async("async-executor")
public int readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return sum;
    }catch (Exception e){
        return -1;
    }
}

(2)通过completableFuture.get()获取返回值

public static boolean getCompletableFutureResult(List> list, int excelRow){
    logger.info("通过completableFuture.get()获取每个异步线程的插入结果----开始");

    int sum = 0;
    for (int i = 0; i < list.size(); i++) {
        Integer result = list.get(i).get();
        sum += result;
    }

    boolean insertFlag = excelRow == sum;
    logger.info("全部执行完毕,excelRow={},入库={}, 数据是否一致={}",excelRow,sum,insertFlag);
    return insertFlag;
}

3、效率对比

(1)测试环境

(2)统计四种情况下10万数据入库时间

备注:因为CompletableFuture不阻塞主线程,主线程执行时间只有2秒,表格中统计的是异步线程全部执行完成的时间。

(3)设置核心线程数

将核心线程数CorePoolSize设置成CPU的处理器数量,是不是效率最高的?

// 获取CPU的处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 测试电脑是24

因为在接口被调用后,开启异步线程,执行入库任务,因为测试机最多同时开启24线程处理任务,故将10万条数据拆分成等量的24份,也就是10万/24 = 4166,那么我设置成4200,是不是效率最佳呢?

测试的过程中发现,好像真的是这样的。

自定义ForkJoinPool线程池
@Autowired
@Qualifier("asyncTaskExecutor")
private Executor asyncTaskExecutor;

@Override
public void readXls(String filePath, String filename) {
  List> completableFutureList = new ArrayList<>();
    for (int time = 0; time < times; time++) {
  CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
         @Override
         public Integer get() {
             try {
                 return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);
             } catch (Exception e) {
                 logger.error("CompletableFuture----readXlsCacheAsync---异常:", e);
                 return -1;
             }
         };
     },asyncTaskExecutor);
 
     completableFutureList.add(completableFuture);
 }

 // 不会阻塞主线程
    CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
        try {
            int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
        } catch (Exception ex) {
            return;
        }
    });
}
自定义线程池

@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //设置线程名称
    executor.setThreadNamePrefix("asyncTask-Executor");
    //设置最大线程数
    executor.setMaxPoolSize(200);
    //设置核心线程数
    executor.setCorePoolSize(24);
    //设置线程空闲时间,默认60
    executor.setKeepAliveSeconds(200);
    //设置队列容量
    executor.setQueueCapacity(50);
    
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}

(4)统计分析

效率对比:

③通过CompletableFuture获取异步返回值(12线程) <  ②通过Future获取异步返回值 <  ④通过CompletableFuture获取异步返回值(24线程) <  ①不获取异步返回值

不获取异步返回值时性能最优,这不废话嘛~

核心线程数相同的情况下,CompletableFuture的入库效率要优于Future的入库效率,10万条数据大概要快4秒钟,这还是相当惊人的,优化的价值就在于此。

四、通过CompletableFuture.allOf解决阻塞主线程问题

1、语法

CompletableFuture.allOf(CompletableFuture的可变数组).whenComplete((r,e) -> {})。

2、代码实例

getCompletableFutureResult方法在 “3.2.2 通过completableFuture.get()获取返回值”。

// 不会阻塞主线程
CompletableFuture.allOf(completableFutureList.toArray(new   CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
    logger.info("全部执行完毕,解决主线程阻塞问题~");
    try {
        int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
    } catch (Exception ex) {
        logger.error("全部执行完毕,解决主线程阻塞问题,异常:", ex);
        return;
    }
});

// 会阻塞主线程
//getCompletableFutureResult(completableFutureList, excelRow);

logger.info("CompletableFuture----会阻塞主线程吗?");

五、CompletableFuture中花俏的语法糖

1、runAsync

runAsync 方法不支持返回值。

可以通过runAsync执行没有返回值的异步方法。

不会阻塞主线程。

// 分批异步读取Excel内容并入库
int finalEnd = end;
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();

2、supplyAsync

supplyAsync也可以异步处理任务,传入的对象实现了Supplier接口。将Supplier作为参数并返回CompletableFuture结果值,这意味着它不接受任何输入参数,而是将result作为输出返回。

会阻塞主线程。

supplyAsync()方法关键代码:

int finalEnd = end;
CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
});
@Override
public int readXlsCacheAsyncMybatis() {
    // 不为人知的操作
    // 返回异步方法执行结果即可
 return 100;
}

六、顺序执行异步任务

1、thenRun

thenRun()不接受参数,也没有返回值,与runAsync()配套使用,恰到好处。

// JDK8的CompletableFuture
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis())
.thenRun(() -> logger.info("CompletableFuture----.thenRun()方法测试"));

2、thenAccept

thenAccept()接受参数,没有返回值。

supplyAsync + thenAccept

CompletableFuture.supplyAsync(new Supplier() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenAccept(x -> logger.info(".thenAccept()方法测试:" + x));

但是,此时无法通过completableFuture.get()获取supplyAsync的返回值了。

3、thenApply

thenApply在thenAccept的基础上,可以再次通过completableFuture.get()获取返回值。

supplyAsync + thenApply,典型的链式编程。

CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() {
 @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenApply((result) -> {
    return thenApplyTest2(result);// supplyAsync返回值 * 2
}).thenApply((result) -> {
    return thenApplyTest5(result);// thenApply返回值 * 5
});

logger.info("readXlsCacheAsyncMybatis插入数据 * 2 * 5 = " + completableFuture.get());

七、CompletableFuture合并任务

CompletableFuture合并任务的代码实例,这里就不多赘述了,一些语法糖而已,大家切记陷入低水平勤奋的怪圈。

八、CompletableFuture VS Future总结

本文中以下几个方面对比了CompletableFuture和Future的差异:

Future提供了异步执行的能力,但Future.get()会通过轮询的方式获取异步返回值,get()方法还会阻塞主线程。

轮询的方式非常消耗CPU资源,阻塞的方式显然与我们的异步初衷背道而驰。

JDK8提供的CompletableFuture实现了Future接口,添加了很多Future不具备的功能,比如链式编程、异常处理回调函数、获取异步结果不阻塞不轮询、合并异步任务等。

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。

异步多线程情况下如何实现事务?

有的小伙伴可能会说:

这还不简单?添加@Transactional注解,如果发生异常或入库数据量不符,直接回滚就可以了~

那么,真的是这样吗?我们下期见~

来源:哪吒编程内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯