Java 8 CompletableFuture动机与应用

动机

试想如下计算场景:

1
2
3
4
int a = A();
int b = B(a);
int c = C(a);
int d = D(b,c);

即:先计算A,然后,拿A的结果计算B/C。最后,再拿B/C的结果计算D。

再假设,如果各个计算节点都是”耗时任务”,我们该如何优化上述的计算任务?

站在上帝视角,我们知道B/C是可以并行处理的,但这种情况对编译器通常是“无能为力”的,
这时就需要开发人员来介入优化流程, 使用传统的Future优化代码如下:

1
2
3
4
int a = A();
Future<Integer> bf = ForkJoinPool.commonPool().submit(() -> B(a));
Future<Integer> cf = ForkJoinPool.commonPool().submit(() -> C(a));
int d = D(bf.get(), cf.get());
  • 首先,手动创建线程池,将计算封装成任务提交,这带来了一定的模板代码(虽然这里使用了一个已有线程池)和“噪声”。
    改进办法是把对应的模板代码都移到方法内部,即

    1
    2
    3
    int B(int i);
    // 签名改为
    Future<Integer> B(int i);
  • 其次,异步后异常处理一个重要的问题,异步异常通常难以排查和定位。
    改进办法是给异步方法传递一个异常回调,即

    1
    Future<Integer> B(int i, ExceptionCallBack cb);

    当然传递多个参数,最好的情况当然是传递一个封装对象。
    这样封装后,那么整个代码的维护将会更加复杂,已有系统改造也更加困难,而且异步代码有一定的“传染性”,造成更多的代码改造。

不考虑异常情况,演化的最简代码示例如下:

1
2
3
4
int a = A();
Future<Integer> bf = B(a);
Future<Integer> cf = C(a);
int d = D(bf.get(), cf.get());

那么,这是最优“表达”吗?如果遇到更复杂的异步任务编排,那么对应的多线程和编排代码就会异常复杂。

打破枷锁,试想我们是不是可以简单的用下面的代码来表达我们想要的计算“编排”?

1
a.all(b,c).apply(d);

是不是非常优雅?这种方式被称为声明式编程。
CompletableFuture就是为越来越流行的异步服务编排、service-mesh等场景设计的。
使用CompletableFuture就可以以声明式的代码编排异步服务,同时屏蔽了线程、锁等并发概念。

看看上面的演化过程,是不是和JDK 8的stream API引入的核心理念有异曲同工之妙!
即:把复杂的线程调度处理模板代码内置,以达到特定使用场景的最优化(声明式编程)。

上面只是一个简单的计算编排的示例,当多个任务编排时,那么逻辑也会复杂多样。
CompletableFuture提供了完备的创建和编排逻辑,大大简化了利用多核处理器计算性能的代码复杂性。
下面咱们一起看一下CompletableFuture的Java docs API文档,一起领略一下。

CompletableFuture API概述

CompletableFuture实现了两个接口CompletionStage, Future,下面是两个接口API说明:

Future: A Future represents the result of an asynchronous computation.(代表一个异步计算的结果)。

CompletionStage: A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes.(一个可能的异步计算步骤,执行一个动作或者当另外一个步骤完成时计算一个值)。

那么柔和上面两个接口的功能定义,CompletableFuture是一个可完成的、异步计算步骤的计算结果。
纵观CompletableFuture的docs文档,CompletableFuture主要对外提供了一些静态的构造方法,以及主要用于各种编排的实例方法。

静态构造方法

  • completedFuture(U value): 创建一个已经完成的包含指定值的CompletableFuture对象。
  • runAsync(Runnable runnable): 创建一个在ForkJoinPool.commonPool()中异步完成的CompletableFuture。
  • supplyAsync(Supplier supplier): 创建一个在ForkJoinPool.commonPool()中异步完成的CompletableFuture,有返回值。
  • allOf(CompletableFuture<?>… cfs): 则是创建一个在所有入参CompletableFuture完成后完成的CompletableFutures。

    另外,runAsync、supplyAsync和allOf还分别有一个有 Executor 参数的重载版本,用于提供自己的线程池实现。

实例方法

主要用于各种逻辑编排,这部分内容比较多,建议直接参考官方文档。这里仅仅罗列几个:

  • handle*(BiFunction<? super T,Throwable,? extends U> fn): 当当前CompletionStage完成或者异常时,返回一个CompletionStage。
  • runAfterBoth(CompletionStage<?> other, Runnable action): 当当前CompletionStage和入参CompletionStage都完成时,返回一个CompletionStage。
  • acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action): 当当前或入参的CompletionStage任何一个完成时调用Consumer动作,返回一个CompletionStage。
  • thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action): 当当前或入参的CompletionStage都完成时,将入参传入对应BiConsumer,返回一个CompletionStage。
  • 等等上述方法可能有多个重载方法。

总之,CompletableFuture的很多实例方法,主要是满足多种编排逻辑和适配多种处理方法等,没必要都熟记于心,需要时查询即可(手册不离手^_^!)。

总结

本文主要是从应用的角度来探讨CompletableFuture的演化动机,了解这些大家也就明白CompletableFuture的主要应用的场景。紧接着第二部分,梳理了CompletableFuture的API文档,说明了创建和使用的方法,并引出了多种编排逻辑。

随着微服务趋势的不断应用,以及service mesh的兴起,相信大家会越来越多的面临上面的计算场景,CompletableFuture异步编排一定会让你爱不释手。

参考资料

Java docs

,
© 2023 PLAYAROUND All Rights Reserved. 本站访客数人次 本站总访问量
Theme by hiero