1、引言
completablefuture 是 java 8 引入的一个非常强大的异步编程工具,属于 java.util.concurrent 包。它不仅支持异步执行任务,还支持任务的组合、异常处理、回调等丰富的操作。下面我会详细介绍 completablefuture 的核心用法和常见场景。
2. 基本概念
- future:早期的异步结果表示,功能有限,只能通过
get()阻塞获取结果。 - completablefuture:增强版的 future,支持链式异步编程、组合、异常处理、回调等。
3. 创建 completablefuture
3.1. 手动创建
completablefuture<string> future = new completablefuture<>();
// 可以手动完成
future.complete("hello");3.2. 通过静态工厂方法
supplyasync:有返回值,异步执行runasync:无返回值,异步执行
completablefuture<string> future1 = completablefuture.supplyasync(() -> {
// 模拟耗时操作
return "hello completablefuture";
});
completablefuture<void> future2 = completablefuture.runasync(() -> {
// 执行某些操作
});4. 获取结果
get():阻塞等待结果join():类似于get(),但遇到异常会抛出 unchecked 异常
string result = future1.get(); // 可能抛出异常 string result2 = future1.join(); // runtimeexception
5. 回调和链式操作
5.1. thenapply / thenapplyasync
对结果进行转换(有返回值)
completablefuture<integer> future = completablefuture.supplyasync(() -> 100)
.thenapply(i -> i + 10); // 结果为1105.2. thenaccept / thenacceptasync
对结果做处理,无返回值
completablefuture.supplyasync(() -> "hello")
.thenaccept(s -> system.out.println(s));5.3. thenrun / thenrunasync
无参数、无返回值,仅执行后续操作
completablefuture.supplyasync(() -> "hello")
.thenrun(() -> system.out.println("任务执行完毕"));6. 任务组合
5.1. thencombine / thencombineasync
两个任务都完成后,合并结果
completablefuture<integer> f1 = completablefuture.supplyasync(() -> 10); completablefuture<integer> f2 = completablefuture.supplyasync(() -> 20); completablefuture<integer> result = f1.thencombine(f2, (a, b) -> a + b); // 30
6.2. thencompose / thencomposeasync
任务依赖,前一个结果作为后一个输入
completablefuture<string> f = completablefuture.supplyasync(() -> "hello")
.thencompose(s -> completablefuture.supplyasync(() -> s + " world"));6.3. allof / anyof
等待多个任务全部/任一完成
completablefuture<void> all = completablefuture.allof(f1, f2); all.join(); // 等待全部完成 completablefuture<object> any = completablefuture.anyof(f1, f2); object fastest = any.join(); // 任意一个完成即可
7. 异常处理
7.1. exceptionally
捕获异常,返回默认值
completablefuture<integer> future = completablefuture.supplyasync(() -> {
throw new runtimeexception("出错了");
}).exceptionally(e -> {
system.out.println(e.getmessage());
return 0;
});7.2. handle / handleasync
无论成功或失败都处理
completablefuture<integer> future = completablefuture.supplyasync(() -> {
throw new runtimeexception("出错了");
}).handle((result, ex) -> {
if (ex != null) {
system.out.println(ex.getmessage());
return 0;
}
return result;
});8. 自定义线程池
默认使用 forkjoinpool.commonpool(),可以自定义线程池:
executorservice executor = executors.newfixedthreadpool(2); completablefuture.supplyasync(() -> "hello", executor);
9. 实用示例
9.1. 多个异步任务并发执行,最后汇总
completablefuture<string> f1 = completablefuture.supplyasync(() -> "a");
completablefuture<string> f2 = completablefuture.supplyasync(() -> "b");
completablefuture<string> f3 = completablefuture.supplyasync(() -> "c");
completablefuture<void> all = completablefuture.allof(f1, f2, f3);
all.thenrun(() -> {
try {
system.out.println(f1.get() + f2.get() + f3.get());
} catch (exception e) {
e.printstacktrace();
}
});10. 注意事项
- 尽量避免阻塞(如
get()),推荐使用回调。 - 注意线程池资源,合理分配,避免 oom。
- 处理好异常,避免线程池线程被异常吞掉。
11. 进阶用法
11.1. 串联和并联任务
串联(依赖关系)
当一个任务的结果依赖于另一个任务时,使用 thencompose:
completablefuture<string> getuserid = completablefuture.supplyasync(() -> "user123");
completablefuture<string> getuserinfo = getuserid.thencompose(id ->
completablefuture.supplyasync(() -> "用户信息:" + id)
);并联(聚合结果)
多个任务并发执行,最后聚合结果:
completablefuture<integer> t1 = completablefuture.supplyasync(() -> 1);
completablefuture<integer> t2 = completablefuture.supplyasync(() -> 2);
completablefuture<integer> t3 = completablefuture.supplyasync(() -> 3);
completablefuture<list<integer>> all = completablefuture.allof(t1, t2, t3)
.thenapply(v -> {
list<integer> result = new arraylist<>();
result.add(t1.join());
result.add(t2.join());
result.add(t3.join());
return result;
});11.2. 超时控制
java 9 后,completablefuture 增加了超时相关方法:
future.ortimeout(3, timeunit.seconds)
.exceptionally(ex -> {
system.out.println("超时啦");
return null;
});或自己实现:
completablefuture<string> future = completablefuture.supplyasync(() -> {
thread.sleep(5000);
return "hello";
});
try {
string result = future.get(2, timeunit.seconds); // 2秒超时
} catch (timeoutexception e) {
system.out.println("超时了");
}11.3. 异步流水线
你可以链式地组合多个异步操作,形成“流水线”:
completablefuture.supplyasync(() -> "a")
.thenapply(s -> s + "b")
.thenapply(s -> s + "c")
.thenaccept(system.out::println); // 输出 abc11.4. 处理异常和兜底方案
推荐使用 handle 或 exceptionally 做兜底:
completablefuture<integer> future = completablefuture.supplyasync(() -> {
throw new runtimeexception("出错了");
})
.handle((result, ex) -> ex == null ? result : -1);11.5. 自定义线程池的好处
- 控制线程数量,避免公共线程池被占满。
- 适合高并发、io密集型场景。
executorservice executor = executors.newfixedthreadpool(10); completablefuture.supplyasync(() -> "业务逻辑", executor);
12. 实战场景举例
12.1. 微服务并发调用
假设要并发调用三个微服务接口,最后聚合结果:
completablefuture<string> api1 = completablefuture.supplyasync(() -> callapi1());
completablefuture<string> api2 = completablefuture.supplyasync(() -> callapi2());
completablefuture<string> api3 = completablefuture.supplyasync(() -> callapi3());
completablefuture<void> all = completablefuture.allof(api1, api2, api3);
all.thenaccept(v -> {
string r1 = api1.join();
string r2 = api2.join();
string r3 = api3.join();
system.out.println("聚合结果:" + r1 + r2 + r3);
});12.2. 异步写数据库+异步发消息
completablefuture<void> savedb = completablefuture.runasync(() -> savetodb());
completablefuture<void> sendmsg = completablefuture.runasync(() -> sendmsg());
completablefuture.allof(savedb, sendmsg)
.thenrun(() -> system.out.println("所有操作完成"));13. 常见问题与建议
- 线程池泄漏:线程池要合理关闭,避免资源泄漏。
- 异常未处理:建议所有链路最后加
.exceptionally或.handle。 - 阻塞等待:尽量用回调而不是
get()或join()。 - 链式操作:推荐链式编程,代码更清晰。
总结
completablefuture 是 java 异步编程的利器,支持丰富的组合、回调和异常处理能力。合理使用可以极大提升程序的并发能力和响应速度。
到此这篇关于java中的completablefuture核心用法和常见场景的文章就介绍到这了,更多相关java completablefuture使用内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论