👉 cffu(CompletableFuture-Fu 🦝)是一个小小的CompletableFuture(CF)辅助增强库,提升CF使用体验并减少误用,在业务中更方便高效安全地使用CF。😋🚀🦺
欢迎 👏 💖
- 建议和提问,提交 Issue
- 贡献和改进,Fork 后提通过 Pull Request 贡献代码
- 🔧 功能
- 👥 User Guide
- 1.
cffu的使用方式 - 2.
cffu功能介绍- 2.1 支持返回多个输入
CF的整体运行结果 - 2.2 支持设置缺省的业务线程池并封装携带
- 2.3 高效灵活的并发执行策略(
AllFailFast/AnySuccess/AllSuccess/MostSuccess) - 2.4 支持直接运行多个
Action,而不是要先包装成CompletableFuture - 2.5 支持处理指定异常类型,而不是处理所有异常
Throwable - 2.6
Backport支持Java 8 - 2.7 超时执行安全的
orTimeout/completeOnTimeout新实现 - 2.8 支持超时的
join方法 - 2.9 返回具体类型的
anyOf方法 - 2.10 输入宽泛类型的
allOf/anyOf方法 - 更多功能说明
- 2.1 支持返回多个输入
- 1.
- 🔌 API Docs
- 🍪依赖
- 📚 更多资料
- 👋 关于库名
☘️ 补全业务使用中缺失的功能
- 🏪 更方便的功能,如
- 支持返回多个输入
CF的运行结果,而不是返回没有包含输入CF的结果(CompletableFuture#allOf)
如方法allResultsFailFastOf/allResultsOf/mSupplyFailFastAsync/thenMApplyFailFastAsync - 支持返回多个不同类型的输入
CF的结果,而不是同一类型
如方法allTupleFailFastOf/allTupleOf/mSupplyTupleFailFastAsync/thenMApplyTupleFailFastAsync - 支持直接运行多个
Action,而不是要先包装成CompletableFuture
如方法mSupplyAsync/mRunAsync/mSupplyFailFastAsync/thenMApplyMostSuccessTupleAsync - 支持设置缺省的业务线程池并封装携带,
CffuFactory#builder(executor)方法,而不是在异步执行时反复传入业务线程池参数 - 支持处理指定异常类型的
catching方法,而不是处理所有异常Throwable(CompletableFuture#exceptionally)
- 支持返回多个输入
- 🚦 更高效灵活的并发执行策略,如
AllFailFast策略:当输入的多个CF有失败时快速失败返回,而不再于事无补地等待所有CF运行完成(CompletableFuture#allOf)AnySuccess策略:返回首个成功的CF结果,而不是首个完成但可能失败的CF(CompletableFuture#anyOf)AllSuccess策略:返回多个CF中成功的结果,对于失败的CF返回指定的缺省值MostSuccess策略:指定时间内返回多个CF中成功的结果,对于失败或超时的CF返回指定的缺省值All(Complete)/Any(Complete)策略:这2个是CompletableFuture已有支持的策略
- 🦺 更安全的使用方式,如
- 超时执行安全的
orTimeout/completeOnTimeout方法新实现
CF#orTimeout/CF#completeOnTimeout方法会导致CF的超时与延迟执行基础功能失效❗️ - 一定不会修改
CF结果的peek处理方法
whenComplete方法可能会修改CF的结果,返回CF的结果与输入并不一定一致 - 支持超时的
join(timeout, unit)方法 - 支持禁止强制篡改,
CffuFactoryBuilder#forbidObtrudeMethods方法 - 在类方法附加完善的代码质量注解,在编码时
IDE能尽早提示出问题
如@NonNull、@Nullable、@CheckReturnValue、@Contract等
- 超时执行安全的
- 🧩 缺失的基本功能,除了上面面向安全而新实现的方法,还有
- 异步异常完成,
completeExceptionallyAsync方法 - 非阻塞地获取成功结果,对于失败的或还在运行中的
CF则返回指定的缺省值,getSuccessNow方法 - 解包装
CF异常成业务异常,unwrapCfException方法
- 异步异常完成,
⏳ Backport支持Java 8,Java 9+高版本的所有CF新功能方法在Java 8低版本直接可用,如
- 超时控制:
orTimeout/completeOnTimeout - 延迟执行:
delayedExecutor - 工厂方法:
failedFuture/completedStage/failedStage - 处理操作:
completeAsync/exceptionallyAsync/exceptionallyCompose/copy - 非阻塞读:
resultNow/exceptionNow/state
💪 已有功能的增强,如
anyOf方法:返回具体类型T(类型安全),而不是返回Object(CompletableFuture#anyOf)allOf/anyOf方法:输入更宽泛的CompletionStage参数类型,而不是CompletableFuture类(CompletableFuture#allOf/anyOf)
更多cffu功能及其使用方式的说明参见 cffu功能介绍。
如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。
并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单。
其中CompletableFuture(CF)有其优点:
- 广为人知广泛使用,有一流的群众基础
CompletableFuture在2014年发布的Java 8提供,有10年了CompletableFuture的父接口Future早在2004年发布的Java 5中提供,有20年了。虽然Future接口不支持运行结果的异步获取与并发执行逻辑的编排,但也让广大Java开发者熟悉了Future这个典型的概念与工具
- 功能强大、但不会非常庞大复杂
- 高层抽象
- 或说 以业务流程的形式表达技术的并发流程
- 可以避免或减少使用繁琐易错的并发协调基础工具:同步器
Synchronizers(如CountDownLatch、CyclicBarrier、Phaser)、锁Locks和原子类atomic
Java标准库内置- 无需额外依赖,几乎总是可用
- 相信有极高的实现质量
与其它并发工具、框架一样,CompletableFuture用于
- 并发执行业务逻辑,或说编排并发处理流程或异步任务
- 多核并行处理,充分利用资源
- 缩短请求响应时间,提升业务响应性
值得更深入地了解和应用。 💕
- 🦝 使用
Cffu类 - 🔧 使用
CompletableFutureUtils工具类
相比调用CompletableFutureUtils工具类的静态方法,
- 使用
Cffu类就像使用CompletableFuture一样,新功能作为Cffu类的实例方法,可以自然方便地调用Cffu类之于工具类CompletableFutureUtils,就像Guava的FluentFuture之于ListenableFuture的工具类Futures
Java语言不支持在已有类(CompletableFuture)上扩展方法,所以需要一个新的包装类(Cffu)
如果你不想在项目中引入新类(Cffu类)、觉得这样增加了复杂性的话,完全可以将cffu库作为一个工具类来用:
- 优化
CompletableFuture使用的工具方法在业务项目中很常见 CompletableFutureUtils提供了一系列实用高效安全可靠的工具方法- 这种使用方式有些
cffu功能没有提供(也没有想到好的实现方案)
如支持设置缺省的业务线程池、禁止强制篡改
为了方便自然地使用cffu库的增强功能与方法,可以迁移使用CompletableFuture类的已有代码到Cffu类。
1) 如果可以修改使用CompletableFuture的代码
迁移到Cffu类,包含2步简单的修改:
- 在类型声明地方,将
CompletableFuture类改成Cffu类 - 在
CompletableFuture静态方法调用的地方,将类名CompletableFuture改成cffuFactory实例
之所以可以这样迁移,是因为:
CompletableFuture类的所有实例方法都在Cffu类中有实现,且有相同的方法签名与功能CompletableFuture类的所有静态方法都在CffuFactory类中有实现,且有相同的方法签名与功能
2) 如果不能修改使用CompletableFuture的代码(如在外部库中返回的CF)
使用CffuFactory.toCffu(CompletionStage)方法,将CompletableFuture或CompletionStage转换成Cffu类型。
-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu</artifactId> <version>1.1.1</version> </dependency>
-
For
Gradleprojects:Gradle Kotlin DSL
implementation("io.foldright:cffu:1.1.1")Gradle Groovy DSL
implementation 'io.foldright:cffu:1.1.1'
cffu也支持Kotlin扩展方法的使用方式,参见cffu-kotlin/README.md;使用方式的对比示例参见docs/usage-mode-demo.md。
CompletableFuture的allOf方法没有返回输入CF的运行结果(方法的返回类型是CF<Void>)。
由于不能方便地获取输入CF的运行结果,需要:
- 在
allOf方法之后再通过入参CF的读操作(如join/get)来获取结果- 操作繁琐 🔧🤯
- 像
join/get读方法是阻塞的,增加了业务逻辑的死锁风险❗️
更多说明可以看看CompletableFuture原理与实践 - 4.2.2 线程池循环引用会导致死锁
- 或是在传入的
CompletableFuture Action中设置外部的变量- 需要注意多线程读写的线程安全问题
⚠️ 🔀
多线程读写涉及多线程数据传递的复杂性,遗漏并发逻辑的数据读写的正确处理是业务代码中的常见问题❗️ - 并发深坑勿入,并发逻辑复杂易出Bug 🐞
如果涉及超时则会更复杂,JDK CompletableFuture自身在Java 21中也有这方面的Bug修复 ⏰
- 需要注意多线程读写的线程安全问题
cffu的allResultsFailFastOf / allResultsOf / mostSuccessResultsOf等方法提供了返回输入CF结果的功能。使用这些方法获取输入CF的整体运行结果:
- 方便直接
- 因为返回的是有整体结果的
CF,可以继续串接非阻塞的操作,所以自然减少了阻塞读方法(如join/get)的使用,尽量降低业务逻辑的死锁风险 - 规避了在业务逻辑中直接实现多线程读写逻辑的复杂线程安全问题与逻辑错误
- 使用「可靠实现与测试的」库所提供的并发功能而不是去直接实现 是 最佳实践 🏆✅
示例代码如下:
public class AllResultsOfDemo {
private static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
private static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
//////////////////////////////////////////////////
// CffuFactory#allResultsOf
//////////////////////////////////////////////////
Cffu<Integer> cffu1 = cffuFactory.completedFuture(21);
Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);
Cffu<Void> all = cffuFactory.allOf(cffu1, cffu2);
// result type is Void!
//
// the result can be got by input argument `cf1.get()`, but it's cumbersome.
// so we can see a lot of util methods to enhance `allOf` with result in our project.
Cffu<List<Integer>> allResults = cffuFactory.allResultsOf(cffu1, cffu2);
System.out.println(allResults.get());
// output: [21, 42]
//////////////////////////////////////////////////
// or CompletableFutureUtils#allResultsOf
//////////////////////////////////////////////////
CompletableFuture<Integer> cf1 = CompletableFuture.completedFuture(21);
CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);
CompletableFuture<Void> all2 = CompletableFuture.allOf(cf1, cf2);
// result type is Void!
CompletableFuture<List<Integer>> allResults2 = allResultsOf(cf1, cf2);
System.out.println(allResults2.get());
// output: [21, 42]
}
}# 完整可运行的Demo代码参见
AllResultsOfDemo.java。
上面是多个相同结果类型的CF,cffu还提供了返回多个不同类型的输入CF结果的allTupleFailFastOf / allTupleOf / mSupplyTupleFailFastAsync等方法。
示例代码如下:
public class AllTupleOfDemo {
private static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
private static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
//////////////////////////////////////////////////
// allTupleFailFastOf / allTupleOf
//////////////////////////////////////////////////
Cffu<String> cffu1 = cffuFactory.completedFuture("foo");
Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);
Cffu<Tuple2<String, Integer>> allTuple = cffuFactory.allTupleFailFastOf(cffu1, cffu2);
System.out.println(allTuple.get());
// output: Tuple2(foo, 42)
//////////////////////////////////////////////////
// or CompletableFutureUtils.allTupleFailFastOf / allTupleOf
//////////////////////////////////////////////////
CompletableFuture<String> cf1 = CompletableFuture.completedFuture("foo");
CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);
CompletableFuture<Tuple2<String, Integer>> allTuple2 = allTupleFailFastOf(cf1, cf2);
System.out.println(allTuple2.get());
// output: Tuple2(foo, 42)
}
}# 完整可运行的Demo代码参见
AllTupleOfDemo.java。
CompletableFuture异步执行(即*Async方法)使用的缺省线程池是ForkJoinPool.commonPool();业务中使用这个缺省线程池是很危险的❗
ForkJoinPool.commonPool()差不多是CPU个线程,合适执行CPU密集的任务;对于业务逻辑,往往有很多等待操作(如网络IO、阻塞等待)并不是CPU密集的,导致业务处理能力低下 🐌ForkJoinPool使用的是无界队列;当大流量时任务会堆积,导致内存耗尽服务崩溃 🚨
关于这个问题及原因的更多说明可以看看这篇文章
结果就是,在业务逻辑中,调用CompletableFuture的*Async方法时,几乎每次都要反复传入指定的业务线程池;这让CompletableFuture的使用很繁琐易错 🤯❌
另外,当在底层逻辑底层操作回调业务时(如RPC回调),不合适或方便为业务提供线程池;使用Cffu封装携带的上层业务指定的线程池既方便又合理安全。
这个使用场景的更多说明可以看看CompletableFuture原理与实践 - 4.2.3 异步RPC调用注意不要阻塞IO线程池
示例代码如下:
public class NoDefaultExecutorSettingForCompletableFuture {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static void main(String[] args) {
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(
() -> System.out.println("doing a long time work!"),
myBizExecutor);
CompletableFuture<Void> cf2 = CompletableFuture
.supplyAsync(
() -> {
System.out.println("doing another long time work!");
return 42;
},
myBizExecutor)
.thenAcceptAsync(
i -> System.out.println("doing third long time work!"),
myBizExecutor);
CompletableFuture.allOf(cf1, cf2).join();
}
}# 完整可运行的Demo代码参见
NoDefaultExecutorSettingForCompletableFuture.java。
Cffu类支持设置缺省的业务线程池,规避上面的繁琐与危险。示例代码如下:
public class DefaultExecutorSettingForCffu {
private static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
private static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) {
Cffu<Void> cf1 = cffuFactory.runAsync(() -> System.out.println("doing a long time work!"));
Cffu<Void> cf2 = cffuFactory.supplyAsync(() -> {
System.out.println("doing another long time work!");
return 42;
}).thenAcceptAsync(i -> System.out.println("doing third long time work!"));
cffuFactory.allOf(cf1, cf2).join();
}
}# 完整可运行的Demo代码参见
DefaultExecutorSettingForCffu.java。
CompletableFuture的allOf方法会等待所有输入CF运行完成;即使有CF失败了也要等待后续CF都运行完成,再返回一个失败的CF。- 对于业务逻辑来说,这样失败且继续等待的策略(
AllComplete),减慢了业务响应性;会希望当有输入CF失败了,则快速失败不再做于事无补的等待 cffu提供了相应的allResultsFailFastOf等方法,支持AllFailFast并发执行策略allOf/allResultsFailFastOf两者都是,只有当所有的输入CF都成功时,才返回成功结果
- 对于业务逻辑来说,这样失败且继续等待的策略(
CompletableFuture的anyOf方法返回首个完成的CF,不会等待后续没有完成的CF赛马模式;即使首个完成的CF是失败的,也会返回这个失败的CF结果。- 对于业务逻辑来说,想要的往往不是首个完成但失败的
CF结果(AnyComplete),会希望赛马模式返回首个成功的CF结果 cffu提供了相应的anySuccessOf等方法,支持AnySuccess并发执行策略anySuccessOf只有当所有的输入CF都失败时,才返回失败结果
- 对于业务逻辑来说,想要的往往不是首个完成但失败的
- 返回多个
CF中成功的结果,对于失败的CF返回指定的缺省值- 业务逻辑包含容错时,当某些
CF处理出错时可以使用成功的那部分结果,而不是整体失败 cffu提供了相应的allSuccessOf等方法,支持AllSuccess并发执行策略
- 业务逻辑包含容错时,当某些
- 返回指定时间内多个
CF中成功的结果,对于失败或超时的CF返回指定的缺省值- 业务是最终一致性时,尽量返回有的结果;对于没能及时返回还在运行中处理的
CF,结果会写到分布式缓存中下次业务请求就有了,以避免重复计算 - 这是个常见业务使用模式,
cffu提供了相应的mostSuccessResultsOf等方法,支持MostSuccess并发执行策略
- 业务是最终一致性时,尽量返回有的结果;对于没能及时返回还在运行中处理的
📔 关于多个
CF的并发执行策略,可以看看JavaScript规范Promise Concurrency;在JavaScript中,Promise即对应CompletableFuture。
JavaScript Promise提供了4个并发执行方法:
Promise.all():等待所有Promise运行成功,只要有一个失败就立即返回失败(AllFailFast)Promise.allSettled():等待所有Promise运行完成,不管成功失败(AllComplete)Promise.any():赛马模式,立即返回首个成功的Promise(AnySuccess)Promise.race():赛马模式,立即返回首个完成的Promise(AnyComplete)PS:
JavaScript Promise的方法命名考究~ 👍
cffu的新方法支持了JavaScript Promise规范的并发执行方式~
示例代码如下:
public class ConcurrencyStrategyDemo {
private static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
private static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
////////////////////////////////////////////////////////////////////////
// CffuFactory#allResultsFailFastOf
// CffuFactory#anySuccessOf
// CffuFactory#mostSuccessResultsOf
////////////////////////////////////////////////////////////////////////
final Cffu<Integer> success = cffuFactory.supplyAsync(() -> {
sleep(300); // sleep SHORT time
return 42;
});
final Cffu<Integer> successAfterLongTime = cffuFactory.supplyAsync(() -> {
sleep(3000); // sleep LONG time
return 4242;
});
final Cffu<Integer> failed = cffuFactory.failedFuture(new RuntimeException("Bang!"));
Cffu<List<Integer>> failFast = cffuFactory.allResultsFailFastOf(success, successAfterLongTime, failed);
// fail fast without waiting successAfterLongTime
System.out.println(failFast.exceptionNow());
// output: java.lang.RuntimeException: Bang!
Cffu<Integer> anySuccess = cffuFactory.anySuccessOf(success, successAfterLongTime, failed);
System.out.println(anySuccess.get());
// output: 42
Cffu<List<Integer>> mostSuccess = cffuFactory.mostSuccessResultsOf(
-1, 100, TimeUnit.MILLISECONDS, success, successAfterLongTime, failed);
System.out.println(mostSuccess.get());
// output: [42, -1, -1]
////////////////////////////////////////////////////////////////////////
// or CompletableFutureUtils#allResultsFailFastOf
// CompletableFutureUtils#anySuccessOf
// CompletableFutureUtils#mostSuccessResultsOf
////////////////////////////////////////////////////////////////////////
final CompletableFuture<Integer> successCf = CompletableFuture.supplyAsync(() -> {
sleep(300); // sleep SHORT time
return 42;
});
final CompletableFuture<Integer> successAfterLongTimeCf = CompletableFuture.supplyAsync(() -> {
sleep(3000); // sleep LONG time
return 4242;
});
final CompletableFuture<Integer> failedCf = failedFuture(new RuntimeException("Bang!"));
CompletableFuture<List<Integer>> failFast2 = allResultsFailFastOf(successCf, successAfterLongTimeCf, failedCf);
// fail fast without waiting successAfterLongTime
System.out.println(exceptionNow(failFast2));
// output: java.lang.RuntimeException: Bang!
CompletableFuture<Integer> anySuccess2 = anySuccessOf(successCf, successAfterLongTimeCf, failedCf);
System.out.println(anySuccess2.get());
// output: 42
CompletableFuture<List<Integer>> mostSuccess2 = mostSuccessResultsOf(
-1, 100, TimeUnit.MILLISECONDS, successCf, successAfterLongTime, failed);
System.out.println(mostSuccess2.get());
// output: [42, -1, -1]
}
}# 完整可运行的Demo代码参见
ConcurrencyStrategyDemo.java。
CompletableFuture的allOf/anyOf方法输入的是CompletableFuture,当业务直接有要编排业务逻辑方法时仍然需要先包装成CompletableFuture再运行:
- 繁琐
- 也模糊了业务流程
cffu提供了直接运行多个Action的方法,方便直接明了地表达业务编排流程。
示例代码如下:
public class MultipleActionsDemo {
static void mRunAsyncDemo() {
// wrap tasks to CompletableFuture first, AWKWARD! 😖
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> System.out.println("task1")),
CompletableFuture.runAsync(() -> System.out.println("task2")),
CompletableFuture.runAsync(() -> System.out.println("task3"))
);
// just run multiple actions, fresh and cool 😋
CompletableFutureUtils.mRunAsync(
() -> System.out.println("task1"),
() -> System.out.println("task2"),
() -> System.out.println("task3")
);
}
}这些多Action方法也配套实现了「不同的并发执行策略」与「返回多个输入CF结果」的支持。
示例代码如下:
public class MultipleActionsDemo {
private static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
private static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
static void thenMApplyAsyncDemo() {
// wrap tasks to CompletableFuture first, AWKWARD! 😖
completedFuture(42).thenCompose(v ->
CompletableFutureUtils.allResultsFailFastOf(
CompletableFuture.supplyAsync(() -> v + 1),
CompletableFuture.supplyAsync(() -> v + 2),
CompletableFuture.supplyAsync(() -> v + 3)
)
).thenAccept(System.out::println);
// output: [43, 44, 45]
cffuFactory.completedFuture(42).thenCompose(v ->
CompletableFutureUtils.allResultsFailFastOf(
CompletableFuture.supplyAsync(() -> v + 1),
CompletableFuture.supplyAsync(() -> v + 2),
CompletableFuture.supplyAsync(() -> v + 3)
)
).thenAccept(System.out::println);
// output: [43, 44, 45]
// just run multiple actions, fresh and cool 😋
CompletableFutureUtils.thenMApplyFailFastAsync(
completedFuture(42),
v -> v + 1,
v -> v + 2,
v -> v + 3
).thenAccept(System.out::println);
// output: [43, 44, 45]
cffuFactory.completedFuture(42).thenMApplyFailFastAsync(
v -> v + 1,
v -> v + 2,
v -> v + 3
).thenAccept(System.out::println);
// output: [43, 44, 45]
CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(
completedFuture(42),
v -> "string" + v,
v -> v + 1,
v -> v + 2.1
).thenAccept(System.out::println);
// output: Tuple3(string42, 43, 44.1)
cffuFactory.completedFuture(42).thenMApplyAllSuccessTupleAsync(
v -> "string" + v,
v -> v + 1,
v -> v + 2.1
).thenAccept(System.out::println);
// output: Tuple3(string42, 43, 44.1)
}
}# 完整可运行的Demo代码参见
MultipleActionsDemo.java。
在业务处理的try-catch语句中,catch所有异常(Throwable)往往是不好的实践。类似的,CompletableFuture#exceptionally方法,也是处理了所有异常(Throwable)。
应该只处理当前业务自己清楚明确能恢复的具体异常,由外层处理其它的异常;避免掩盖Bug或是错误地处理了自己不能恢复的异常。
cffu提供了相应的catching*方法,支持指定要处理异常类型;相比CF#exceptionally方法新加了一个异常类型参数,使用方式类似,不附代码示例。
Java 9+高版本的所有CF新功能方法在Java 8低版本直接可用。
其中重要的backport功能有:
- 超时控制:
orTimeout/completeOnTimeout - 延迟执行:
delayedExecutor - 工厂方法:
failedFuture/completedStage/failedStage - 处理操作:
completeAsync/exceptionallyAsync/exceptionallyCompose/copy - 非阻塞读:
resultNow/exceptionNow/state
这些backport方法是CompletableFuture的已有功能,不附代码示例。
CF#orTimeout() / CF#completeOnTimeout()方法当超时时使用CF内部的单线程ScheduledThreadPoolExecutor来触发业务逻辑执行,会导致CF的超时与延迟执行基础功能失效❗️
因为超时与延迟执行是基础功能,一旦失效会导致:
- 业务功能的正确性问题,设置超时的触发不准延后
- 系统稳定性问题,如线程中等待操作不能返回、其它有依赖的
CF不能完成、线程池耗尽与内存泄露
cffu库提供了超时执行安全的新实现方法:
Cffu#orTimeout()/Cffu#completeOnTimeoutTimeout()CFU#cffuOrTimeout()/CFU#cffuCompleteOnTimeout()
保证业务逻辑不会在CF的单线程ScheduledThreadPoolExecutor中执行。
更多说明参见:
- 演示问题的
DelayDysfunctionDemo.java cffu backport方法的JavaDoc:CFU#orTimeout()/CFU#completeOnTimeout()- 文章
CompletableFuture超时功能使用不当直接生产事故
cf.join()方法「没有超时会永远等待」,在业务中很危险❗️当意外出现长时间等待时,会导致:
- 主业务逻辑阻塞,没有机会做相应的处理,以及时响应用户
- 会费掉一个线程,线程是很有限的资源(一般几百个),耗尽线程意味着服务瘫痪故障
join(timeout, unit)方法即支持超时的join方法;就像cf.get(timeout, unit)之于cf.get()。
这个新方法使用简单类似,不附代码示例。
CompletableFuture的anyOf()方法返回类型是Object,丢失具体类型,使用返回值时需要转型操作不方便,也不类型安全。
cffu提供的anySuccessOf() / anyOf()方法,返回具体类型T,而不是返回Object。
这个方法使用简单类似,不附代码示例。
CompletableFuture的allOf() / anyOf()方法输入参数类型是CompletableFuture,而不是更宽泛的CompletionStage类型;对于CompletionStage类型的输入,则需要调用CompletionStage#toCompletableFuture方法做转换。
cffu提供的allOf() / anyOf()方法输入更宽泛的CompletionStage参数类型,使用更方便。
方法使用简单类似,不附代码示例。
可以参见:
- 当前版本的
Java API文档: https://foldright.io/api-docs/cffu/
代码示例:
可以在 central.sonatype.com 查看最新版本与可用版本列表。
cffu库(包含Java CompletableFuture的增强CompletableFutureUtils):-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu</artifactId> <version>1.1.1</version> </dependency>
-
For
Gradleprojects:Gradle Kotlin DSL
implementation("io.foldright:cffu:1.1.1")Gradle Groovy DSL
implementation 'io.foldright:cffu:1.1.1'
-
- 📌
TransmittableThreadLocal(TTL)的cffu executor wrapper SPI实现:-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-ttl-executor-wrapper</artifactId> <version>1.1.1</version> <scope>runtime</scope> </dependency>
-
For
Gradleprojects:Gradle Kotlin DSL
runtimeOnly("io.foldright:cffu-ttl-executor-wrapper:1.1.1")Gradle Groovy DSL
runtimeOnly 'io.foldright:cffu-ttl-executor-wrapper:1.1.1'
-
cffu bom:-
For
Mavenprojects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-bom</artifactId> <version>1.1.1</version> <type>pom</type> <scope>import</scope> </dependency>
-
For
Gradleprojects:Gradle Kotlin DSL
implementation(platform("io.foldright:cffu-bom:1.1.1"))Gradle Groovy DSL
implementation platform('io.foldright:cffu-bom:1.1.1')
-
- 官方资料
cffu开发者@linzee1的CF/cffu掘金专栏CompletableFutureGuide- 完备说明
CompletableFuture的使用方式 - 给出 最佳实践建议 与 使用陷阱注意
- 在业务中,更有效安全地使用
CompletableFuture
- 完备说明
cffu 是 CompletableFuture-Fu的缩写;读作C Fu,谐音Shifu/师傅。
嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝