工作项列表及其进展,参见 issue 6。
如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。
并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单。
其中CompletableFuture (CF)有其优点:
Java标准库内置- 无需额外依赖,几乎总是可用
- 相信有极高的实现质量
- 广为人知广泛使用,有一流的群众基础
CompletableFuture在2014年发布的Java 8提供,有~10年了CompletableFuture的父接口Future早在2004年发布的Java 5中提供,有~20年了- 虽然
Future接口不支持 执行结果的异步获取与并发执行逻辑的编排,但也让广大Java开发者熟悉了Future这个典型的概念与工具
- 功能强大、但不会非常庞大复杂
- 高层抽象
- 或说 以业务流程的形式表达技术的并发流程
- 可以不使用繁琐易错的基础并发协调工具,如
CountDownLatch、锁(Lock)、信号量(Semaphore)
和其它并发工具、框架一样,CompletableFuture 用于
- 并发执行业务逻辑,或说编排并发的处理流程/处理任务
- 利用多核并行处理
- 提升业务响应性
值得更深入了解和应用。 💕
- 作为文档库(即
CompletableFutureGuide):- 完备说明
CompletableFuture的使用方式 - 给出 最佳实践建议 与 使用陷阱注意
- 期望在业务中,更有效安全地使用
CompletableFuture
- 完备说明
- 作为代码库(即
cffu库):- 补齐在业务使用中
CompletableFuture所缺失的功能 - 期望在业务中,更方便自然地使用
CompletableFuture
- 补齐在业务使用中
为了阅读的简洁方便,后文
CompletableFuture会简写成CF。
基本概念与术语:
- 任务(
Task)/ 计算(Computation)- 任务逻辑(
Task Logic)/ 业务逻辑(Biz Logic) - 执行(
Execute)任务
- 任务逻辑(
- 状态(
State)- 运行中(
Running)〚1〛 - 取消(
Cancelled)〚2〛 - 完成(
Completed/Done)- 成功(
Success/Successful)/ 正常完成(Completed Normally)/ 成功完成(Completed Successfully) - 失败(
Failed/Fail)/ 异常完成(Completed Exceptionally)
- 成功(
- 运行中(
- 状态转变(
Transition)- 事件(
Event)、触发(Trigger)
- 事件(
- 业务流程(
Biz Flow)、CF链(Chain)- 流程图(
Flow Graph)、有向无环图 /DAG- 为什么构建的
CF链一定是DAG?
- 为什么构建的
- 流程编排(
Flow Choreography)
- 流程图(
- 前驱(
Predecessor)/ 后继(Successor)- 上游任务 / 前驱任务 /
Dependency Task(我依赖的任务) - 下游任务 / 后继任务 /
Dependent Task(依赖我的任务)
- 上游任务 / 前驱任务 /
注:上面用
/隔开的多个词是,在表述CF同一个概念时,会碰到的多个术语;在不影响理解的情况下,后文会尽量统一用第一个词来表达。
更多说明:
- 〚1〛 任务状态有且有只有 运行中(
Running)、取消(Cancelled)、完成(Completed)这3种状态。- 对于「完成」状态,进一步可以分成 成功(
Success)、失败(Failed)2种状态。
- 对于「完成」状态,进一步可以分成 成功(
- 所以也可以说,任务状态有且只有 运行中、取消、成功、失败 这4种状态。
- 右图是任务的状态及其转变图。
- 在概念上
CF的状态转变只能是单次单向的,这很简单可靠、也容易理解并和使用直觉一致。 -
注:虽然下文提到的
obtrudeValue()/obtrudeException方法可以突破CF概念上的约定,但这2个后门方法在正常设计实现中不应该会用到,尤其在业务使用应该完全忽略;带来的问题也由使用者自己了解清楚并注意。
- 〚2〛 关于「取消」状态:
- 对于
CompletableFuture,取消的实现方式是设置CancellationException异常。
- 对于
- 对于「取消」状态,或说设置了「
CancellationException」失败异常的CompletableFuture cf,相比其它异常失败 / 设置了其它失败异常 的情况,不一样的地方:- 调用
cf.get()/cf.get(timeout, unit)方法- 会抛出
CancellationException异常 - 其它异常失败时,这2个方法抛出的是包了一层的
ExecutionException,cause是实际的失败异常
- 会抛出
- 调用
cf.join()/cf.getNow(valueIfAbsent)方法- 会抛出
CancellationException异常 - 其它异常失败时,这2个方法抛出的是包了一层的
CompletionException,cause是实际的失败异常
- 会抛出
- 调用
cf.exceptionNow()方法- 会抛出
IllegalStateException,而不是返回cf所设置的CancellationException异常 - 其它异常失败时,
exceptionNow()返回设置的异常
- 会抛出
- 调用
cf.isCancelled()方法- 返回
true - 其它异常失败时,
isCancelled()返回false
- 返回
- 调用
- 其它地方,
CancellationException异常与其它异常是一样处理的。比如:- 调用
cf.resultNow()方法
都是抛出IllegalStateException异常 - 调用
cf.isDone()、cf.isCompletedExceptionally()
都是返回true CompletionStage接口方法对异常的处理,如
cf.exceptionally()的方法参数Function<Throwable, T>所处理的都是直接设置的异常对象没有包装过
- 调用
CF任务执行/流程编排,即执行提交的代码逻辑/计算/任务,涉及下面4个方面:
- 任务的输入输出
- 即
CF所关联任务的输入参数/返回结果(及其数据类型)
- 即
- 任务的调度,即在哪个线程来执行任务。可以是
- 在触发的线程中就地连续执行任务
- 在指定
Executor(的线程)中执行任务
- 任务的错误处理(任务运行出错)
- 任务的超时控制
- 超时控制是并发的基础关注方面之一
- 到了
Java 9提供了内置支持,新增了completeOnTimeout(...)/orTimeout(...)方法
本节「并发关注方面」,会举例上一些
CF方法名,以说明CF方法的命名模式;
可以先不用关心方法的具体功能,在「CF的功能介绍」中会分类展开说明CF方法及其功能。
对应下面4种情况:
- 无输入无返回(00)
- 对应
Runnable接口(包含单个run方法)
- 对应
- 无输入有返回(01)
- 对应
Supplier<O>接口(包含单个supply方法)
- 对应
- 有输入无返回(10)
- 对应
Consumer<I>接口(包含单个accept方法)
- 对应
- 有输入有返回(11)
- 对应
Function<I, O>接口(包含单个apply方法)
- 对应
注:
- 对于有输入或返回的接口(即除了
Runnable接口)- 都是泛型的,所以可以支持不同的具体数据类型
- 都是处理单个输入数据
- 如果要处理两个输入数据,即有两个上游
CF的返回,会涉及下面的变体接口
- 对于有输入接口,有两个输入参数的变体接口:
Consumer接口的两参数变体接口:BiConsumer<I1, I2>Function接口的两参数变体接口:BiFunction<I1, I2, O>
CF通过其方法名中包含的用词来体现:
run:无输入无返回(00)- 即是
Runnable接口包含的run方法名 - 相应的
CF方法名的一些例子:runAsync(Runnable runnable)thenRun(Runnable action)runAfterBoth(CompletionStage<?> other, Runnable action)runAfterEitherAsync(CompletionStage<?> other, Runnable action)
- 即是
supply:无输入有返回(01)- 即是
Supplier接口包含的supply方法名 - 相应的
CF方法名的一些例子:supplyAsync(Supplier<U> supplier)supplyAsync(Supplier<U> supplier, Executor executor)
- 即是
accept:有输入无返回(10)- 即是
Consumer接口包含的accept方法名 - 相应的
CF方法名的一些例子:thenAccept(Consumer<T> action)thenAcceptAsync(Consumer<T> action)thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action)acceptEitherAsync(CompletionStage<T> other, Consumer<T> action)
- 即是
apply:有输入有返回(11)- 即是
Function接口包含的apply方法名。CF的方法如 - 相应的
CF方法名的一些例子:thenApply(Function<T, U> fn)thenApplyAsync(Function<T, U> fn)applyToEither(CompletionStage<T> other, Function<T, U> fn)
- 即是
任务调度是指,任务在哪个线程执行。有2种方式:
- 在触发的线程中就地连续执行任务
- 在指定
Executor(的线程)中执行任务
CF通过方法名后缀Async来体现调度方式:
- 有方法名后缀
Async:- 在触发
CF后,任务在指定Executor执行- 如果不指定
executor参数,缺省是ForkJoinPool.commonPool()
- 如果不指定
- 相应的
CF方法名的一些例子:runAsync(Runnable runnable)thenAcceptAsync(Consumer<T> action, Executor executor)runAfterBothAsync(CompletionStage<?> other, Runnable action)
- 在触发
- 无方法名后缀
Async:- 任务在触发线程就地连续执行
- 相应的
CF方法名的一些例子:thenAccept(Consumer<T> action)thenApply(Function<T, U> fn)applyToEither(CompletionStage<T> other, Function<T, U> fn)
提交给CF的任务可以运行出错(抛出异常),即状态是失败(Failed)或取消(Cancelled)。
对于直接读取结果的方法:
- 读取 成功结果的方法,如
cf.get()、cf.join()会抛出异常(包装的异常)来反馈 - 读取 失败结果的方法,如
cf.exceptionNow()会返回结果异常或是抛出异常来反馈
对于CompletionStage接口中编排执行的方法,会根据方法的功能 是只处理成功结果或失败结果一者,或是同时处理成功失败结果二者。如
exceptionally(...)只处理 失败结果whenComplete(...)/handle(...)同时处理 成功与失败结果;- 这2个方法的参数
lamdba(BiConsumer/BiFunction)同时输入成功失败结果2个参数:value与exception
- 这2个方法的参数
- 其它多数的方法只处理 成功结果
- 对于不处理的结果,效果上就好像
没有调用这个CompletionStage方法一样,即短路bypass了 👏
超时控制是并发的基础关注方面之一。
到了Java 9提供了内置支持,新增了completeOnTimeout(...)/orTimeout(...)方法。
CF的超时控制,在实现上其实可以看成是CF的使用方式,并不是CF要实现基础能力;即可以通过其它已有的CF功能,在CF外围实现。
见子文档页 cf-functions-intro.md
CF的方法个数比较多,所以介绍内容有些多,内容继续完善中… 💪 💕
见子文档页 cf-design-patterns.md
还没有什么内容,收集思考展开中… 💪 💕
- 支持设置缺省的业务线程池
CompletableFuture的缺省线程池是ForkJoinPool.commonPool(),这个线程池差不多CPU个线程,合适执行CPU密集的任务。- 对于业务逻辑往往有很多等待操作(如网络
IO、阻塞等待),并不是CPU密集的;使用这个缺省线程池ForkJoinPool.commonPool()很危险❗️
所以每次调用CompletableFuture的*async方法时,都传入业务线程池,很繁琐易错 🤯 Cffu支持设置缺省的业务线程池,规避上面的繁琐与危险
- 一等公民支持
Kotlin🍩 cffuAllOf方法- 运行多个
CompletableFuture并返回结果的allOf方法
- 运行多个
cffuAnyOf方法- 返回具体类型的
anyOf方法
- 返回具体类型的
cffuCombine(...)方法- 运行多个(2 ~ 5个)不同类型的
CompletableFuture,返回结果元组
- 运行多个(2 ~ 5个)不同类型的
cffuJoin(timeout, unit)方法- 支持超时的
join的方法;就像cf.get(timeout, unit)之于cf.get() CompletableFuture缺少这个功能,cf.join()会「不超时永远等待」很危险❗️
- 支持超时的
BackportJava 9+高版本的所有CompletableFuture新功能,在Java 8可以直接使用。
其中重要的Backport功能有:
- 超时控制:
orTimeout(...)/completeOnTimeout(...)方法 - 延迟执行:
delayedExecutor(...)方法 - 工厂方法:
failedFuture(...)/completedStage(...)/failedStage(...)
- 运行多个
CompletableFuture并返回结果的allOf方法:resultAllOf方法,运行多个相同结果类型的CompletableFutureCompletableFuture<List<T>> resultAllOf(CompletableFuture<T>... cfs)CompletableFuture<List<T>> resultAllOf(List<? extends CompletableFuture<T>> cfs)
resultOf方法,运行多个不同结果类型的CompletableFutureCompletableFuture<Pair<T1, T2>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2)CompletableFuture<Triple<T1, T2, T3>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2, CompletableFuture<T3> cf3)
- 具体类型的
anyOf方法:- 提供的方法:
CompletableFuture<T> anyOf(CompletableFuture<T>... cfs)CompletableFuture<T> anyOf(List<? extends CompletableFuture<T>> cfs)
CF返回的类型是Object,丢失具体类型:CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- 提供的方法:
实现所在的类:
public class CffuDemo {
private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();
// Create a CffuFactory with configuration of the customized thread pool
private static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizThreadPool).build();
public static void main(String[] args) throws Exception {
final Cffu<Integer> cf42 = cffuFactory
.supplyAsync(() -> 21) // Run in myBizThreadPool
.thenApply(n -> n * 2);
// Below tasks all run in myBizThreadPool
final Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
sleep(1001);
return n / 2;
});
final Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
sleep(1002);
return n / 2;
});
final Cffu<Integer> longTaskC = cf42.thenApplyAsync(n -> {
sleep(100);
return n * 2;
});
final Cffu<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
sleep(1_000);
throw new RuntimeException("Bang!");
});
final Cffu<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum)
.orTimeout(1500, TimeUnit.MILLISECONDS);
System.out.println("combined result: " + combined.get());
final Cffu<Integer> anyOfSuccess = cffuFactory.cffuAnyOfSuccess(longTaskC, longFailedTask);
System.out.println("anyOfSuccess result: " + anyOfSuccess.get());
////////////////////////////////////////
// cleanup
////////////////////////////////////////
myBizThreadPool.shutdown();
}
}# 完整可运行的Demo代码参见CffuDemo.java。
private val myBizThreadPool: ExecutorService = Executors.newCachedThreadPool()
// Create a CffuFactory with configuration of the customized thread pool
private val cffuFactory: CffuFactory = newCffuFactoryBuilder(myBizThreadPool).build()
fun main() {
val cf42 = cffuFactory
.supplyAsync { 21 } // Run in myBizThreadPool
.thenApply { it * 2 }
// Below tasks all run in myBizThreadPool
val longTaskA = cf42.thenApplyAsync { n: Int ->
sleep(1001)
n / 2
}
val longTaskB = cf42.thenApplyAsync { n: Int ->
sleep(1002)
n / 2
}
val longTaskC = cf42.thenApplyAsync { n: Int ->
sleep(100)
n * 2
}
val longFailedTask = cf42.thenApplyAsync<Int> { _ ->
sleep(1000)
throw RuntimeException("Bang!")
}
val combined = longTaskA.thenCombine(longTaskB, Integer::sum)
.orTimeout(1500, TimeUnit.MILLISECONDS)
println("combined result: ${combined.get()}")
val anyOfSuccess: Cffu<Int> = listOf(longTaskC, longFailedTask).anyOfSuccessCffu()
println("anyOfSuccess result: ${anyOfSuccess.get()}")
////////////////////////////////////////
// cleanup
////////////////////////////////////////
myBizThreadPool.shutdown()
}# 完整可运行的Demo代码参见CffuDemo.kt。
- 当前版本的
Java API文档: https://foldright.io/cffu/apidocs/ - 当前版本的
Kotlin API文档: https://foldright.io/cffu/dokka/
可以在 central.sonatype.com 查看最新版本与可用版本列表。
cffu库(包含Java CompletableFuture的增强CompletableFutureUtils):-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu</artifactId> <version>0.9.7</version> </dependency>
-
For
Gradleprojects:// Gradle Kotlin DSL implementation("io.foldright:cffu:0.9.7")
// Gradle Groovy DSL implementation 'io.foldright:cffu:0.9.7'
-
cffu Kotlin支持库:-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-kotlin</artifactId> <version>0.9.7</version> </dependency>
-
For
Gradleprojects:// Gradle Kotlin DSL implementation("io.foldright:cffu-kotlin:0.9.7")
// Gradle Groovy DSL implementation 'io.foldright:cffu-kotlin:0.9.7'
-
cffu bom:-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-bom</artifactId> <version>0.9.7</version> <type>pom</type> <scope>import</scope> </dependency>
-
For
Gradleprojects:// Gradle Kotlin DSL implementation(platform("io.foldright:cffu-bom:0.9.7"))
// Gradle Groovy DSL implementation platform('io.foldright:cffu-bom:0.9.7')
-
- 📌
TransmittableThreadLocal(TTL)的cffu executor wrapper SPI实现:-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-ttl-executor-wrapper</artifactId> <version>0.9.7</version> <scope>runtime</scope> </dependency>
-
For
Gradleprojects:// Gradle Kotlin DSL runtimeOnly("io.foldright:cffu-ttl-executor-wrapper:0.9.7")
// Gradle Groovy DSL runtimeOnly 'io.foldright:cffu-ttl-executor-wrapper:0.9.7'
-
cffu 是 CompletableFuture-Fu的缩写;读作C Fu,谐音Shifu/师傅。
嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝