completablefuturede介绍
java 8 引入了 completablefuture 类,这是 java 异步编程的一个重要进展。
completablefuture 提供了一种基于未来结果的异步编程模型,允许你以更加直观和易于理解的方式编写非阻塞代码。
completablefuturede使用场景
completablefuture 主要用于:
- 异步计算:如果你有一些计算任务可以异步执行,并且不想阻塞主线程,可以使用 completablefuture。
- 多个并行任务组合:当你有多个独立的异步任务,并且想要在它们都完成后执行某些操作时,可以用 completablefuture 来组合它们。
- 异步回调:当异步计算完成后,你需要执行某些后续操作(如更新 ui、保存结果等),可以通过 thenapply(), thenaccept(), thenrun() 等方法指定回调。
- 超时控制:可以为异步任务设置超时限制,防止任务执行时间过长,导致线程被长时间占用。
- 错误处理:在异步任务中,如果有异常发生,可以通过 handle() 或 exceptionally() 方法进行错误处理。
- 多任务的组合与合成:可以将多个异步任务的结果进行合成,产生新的任务。
常用异步编程实现方案
- thread
特点:
- thread是 java 中最基本的并发执行单位,代表一个独立的执行路径。
- thread可以通过继承 thread 类或实现 runnable 接口来创建和启动。
- 线程会从 run() 方法开始执行,run() 方法可以包含任何逻辑。
- 适合处理简单的并发任务,但不适合复杂的并发场景,因为线程管理较为麻烦。
使用示例:
public static void main(string[] args) { thread thread = new thread(() -> { system.out.println(thread.currentthread().getname() + " is running..."); }); thread.start(); }
- executorservice
特点:
- executorservice 是一个用于执行异步任务的接口,通常与线程池一起使用。
- 它提供了方法来提交任务、关闭线程池、获取任务结果等。
- executorservice 包括多种实现,如 threadpoolexecutor,并且支持任务的异步执行。
- 支持有返回值的任务(通过 submit() 方法)和无返回值的任务(通过 execute() 方法)。
使用示例:
有返回值:
public static void main(string[] args) throws interruptedexception, executionexception { executorservice executor = executors.newfixedthreadpool(2); // 创建线程池 callable<integer> task = () -> { thread.sleep(1000); return 42; }; future<integer> result = executor.submit(task); // 提交任务并获得 future 对象 system.out.println("task result: " + result.get()); // 获取结果 executor.shutdown(); // 关闭线程池 }
无返回值:
public static void main(string[] args) throws interruptedexception { executorservice executor = executors.newfixedthreadpool(2); // 创建线程池 runnable task = () -> { system.out.println(thread.currentthread().getname() + " is running..."); }; executor.execute(task); // 提交任务 executor.shutdown(); // 关闭线程池 }
- countdownlatch
特点:
- countdownlatch 是一个同步辅助类,允许一个或多个线程等待直到其他线程完成某个操作。
- 使用一个计数器(count)来表示待完成的任务数量,每个任务完成后调用 countdown() 方法,计数器减一。
- 当计数器为零时,所有等待的线程会继续执行。
- countdownlatch 不能重用,它适合用于多个线程并行执行后,等待所有线程完成的场景。
使用示例:
public static void main(string[] args) throws interruptedexception { int totalthreads = 3; countdownlatch latch = new countdownlatch(totalthreads); // 初始化计数器为3 runnable task = () -> { try { thread.sleep(1000); system.out.println(thread.currentthread().getname() + " finished."); } catch (interruptedexception e) { e.printstacktrace(); } finally { latch.countdown(); // 每个线程完成后减少计数器 } }; // 启动多个线程 for (int i = 0; i < totalthreads; i++) { new thread(task).start(); } latch.await(); // 等待计数器归零 system.out.println("all tasks are finished."); }
- cyclicbarrier
特点:
- cyclicbarrier 允许一组线程互相等待,直到所有线程都到达一个公共屏障点,然后所有线程再一起继续执行。
- 它的计数器每次归零后会重置,适合用来处理多轮同步任务。
- 每当所有线程到达屏障点时,都会执行一个可选的动作(如回调函数)。
使用示例:
public static void main(string[] args) throws interruptedexception { int totalthreads = 3; cyclicbarrier barrier = new cyclicbarrier(totalthreads, () -> { system.out.println("all threads reached the barrier point, proceeding..."); }); runnable task = () -> { try { thread.sleep(1000); system.out.println(thread.currentthread().getname() + " reached the barrier."); barrier.await(); // 等待其他线程到达屏障点 } catch (exception e) { e.printstacktrace(); } }; // 启动多个线程 for (int i = 0; i < totalthreads; i++) { new thread(task).start(); } }
- forkjoinpool
特点:
- forkjoinpool 是专门用于执行递归任务的线程池,特别适合大规模并行计算。
- 它将任务分割成多个子任务并通过递归的方式处理(“fork”),然后合并子任务的结果(“join”)。
- 在 forkjoinpool 中,任务拆分采用工作窃取算法,尽量平衡工作负载,提升性能。
使用示例:
import java.util.concurrent.*; public class forkjoinpoolexample { public static void main(string[] args) { forkjoinpool pool = new forkjoinpool(); // 创建 forkjoinpool int[] array = {1, 2, 3, 4, 5, 6, 7, 8}; recursivetask<integer> task = new sumtask(array, 0, array.length); int result = pool.invoke(task); // 执行任务并获取结果 system.out.println("sum is: " + result); } } class sumtask extends recursivetask<integer> { private int[] array; private int start, end; public sumtask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @override protected integer compute() { if (end - start <= 2) { // 基础情况 int sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { int mid = (start + end) / 2; sumtask task1 = new sumtask(array, start, mid); sumtask task2 = new sumtask(array, mid, end); task1.fork(); // 异步执行 task2.fork(); return task1.join() + task2.join(); // 合并结果 } } }
- completablefuture
特点:
- completablefuture 是 java 8 引入的异步编程框架,允许你以非阻塞的方式处理任务。
- 它支持任务的组合、回调、异常处理等,适合用于处理复杂的异步任务链。
- 可以通过 supplyasync()、thenapply() 等方法定义异步任务的执行流程。
使用示例:
public static void main(string[] args) throws executionexception, interruptedexception { completablefuture<integer> future = completablefuture.supplyasync(() -> { try { thread.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } return 42; }); // 链式调用,处理结果 completablefuture<integer> result = future.thenapplyasync(value -> value * 2); system.out.println("result: " + result.get()); // 输出结果 }
各种实现方案总结
并发方式 | 特点 | 优点 | 缺点 |
---|---|---|---|
thread | - 最基本的线程创建方式- 通过继承thread 或实现runnable 接口创建任务 | - 简单直观 | - 需要手动管理线程,容易资源浪费或死锁- 无法直接返回任务结果- 对复杂任务协调不便 |
executorservice | - 通过线程池管理线程- 提供任务的调度、执行、生命周期管理 | - 提供线程池避免手动创建和销毁线程,减少资源浪费- 支持任务的结果返回 | - 任务间依赖和组合较复杂-get() 方法阻塞线程,难以实现非阻塞 |
countdownlatch | - 用于等待多个任务完成后执行后续操作- 使用计数器控制任务执行 | - 可以控制任务同步,确保多个任务完成后继续执行 | - 只适用于等待任务完成,无法处理任务的依赖关系- 只能使用一次 |
cyclicbarrier | - 用于多个线程在某一点上等待- 可重复使用,适合同步多任务 | - 可重复使用,适合多次任务同步 | - 不如completablefuture 灵活- 仅适合特定的同步场景 |
forkjoinpool | - 专为递归分治任务设计的线程池- 支持任务拆分和合并 | - 高效利用多核处理器,适合分治算法- 支持任务拆分和合并 | - 对于非递归任务不适合- 异常处理不如completablefuture 灵活 |
completablefuture | - 基于future 设计的异步编程api- 支持非阻塞的任务组合和回调处理 | - 支持链式调用,异步任务组合,避免阻塞- 可以处理异常,支持并行处理和同步等待- 支持thenapply、thenaccept 等多种处理方式,简化代码 | - 复杂任务时调试困难- 异常处理仍较为复杂- 比executorservice 稍显复杂 |
- thread:最基础的并发方式,直接通过线程控制执行,但缺乏高级功能。
- executorservice:基于线程池的高层接口,能够有效管理线程资源和任务执行。
- countdownlatch、cyclicbarrier:用于线程间的同步协调。
countdownlatch
等待特定任务完成,而cyclicbarrier
可重复用于多次任务同步。 - forkjoinpool:适用于任务拆分和合并的场景,特别是递归分治任务。
- completablefuture:提供更灵活的异步任务处理方式,支持链式调用、异步执行及异常处理,适合复杂的并发任务调度。
completablefuturede结构
completablefuture实现了future接口和completionstage接口。
结构梳理
相关接口 | 描述 |
---|---|
future | 是一个表示异步计算结果的接口。它提供了方法来检查异步计算是否完成、获取计算的结果以及取消计算。 |
completionstage | 是一个表示异步计算结果的接口,提供了处理计算结果的非阻塞操作。与 future 不同,completionstage 采用链式调用,可以更灵活地组合多个异步操作。 |
- future接口
future接口是jdk 5引入的,该接口属于java.util.concurrent包。
future接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 executor(执行器),并在稍后获取任务的结果。
主要方法:
方法 | 描述 |
---|---|
get() | 阻塞当前线程,直到异步计算完成,并返回计算结果 |
get(long timeout, timeunit unit) | 阻塞当前线程,直到异步计算完成或超时,并返回计算结果 |
isdone() | 检查异步计算是否完成 |
cancel(boolean mayinterruptifrunning) | 尝试取消异步计算 |
iscancelled() | 检查异步计算是否被取消。 |
- completionstage接口
completionstage 接口是 java 8 引入的一个重要接口,用于描述异步计算的生命周期和结果。
completionstage 提供了一套方法,用于处理异步计算的结果、组合多个计算、处理异常等。
主要方法:
方法 | 描述 |
---|---|
thenapply | 在当前阶段完成后,应用给定的 function,并返回一个新的 completionstage。 |
thenacceptasync | 异步地执行指定的 consumer,并返回一个新的 completionstage,该阶段没有结果。 |
thencomposeasync | 异步地将当前阶段的结果应用于一个返回 completionstage 的函数,并返回一个新的 completionstage。 |
thencombine | 在两个 completionstage 都完成后,使用给定的 bifunction 合并它们的结果,并返回一个新的 completionstage。 |
runaftereitherasync | 在任意一个给定的两个 completionstage 完成后,异步地执行指定的 runnable。 |
thenaccept | 在当前阶段完成后,执行指定的 consumer,并返回一个新的 completionstage,该阶段没有结果。 |
runaftereither | 在任意一个给定的两个 completionstage 完成后,执行指定的 runnable。 |
thencombineasync | 在两个 completionstage 都完成后,异步地使用给定的 bifunction 合并它们的结果,并返回一个新的 completionstage。 |
thenacceptbothasync | 在两个 completionstage 都完成后,异步地执行指定的 biconsumer,并返回一个新的 completionstage。 |
applytoeither | 在两个 completionstage 中任意一个完成后,应用给定的 function,并返回一个新的 completionstage。 |
applytoeitherasync | 在两个 completionstage 中任意一个完成后,异步地应用给定的 function,并返回一个新的 completionstage。 |
runafterbothasync | 在两个 completionstage 都完成后,异步地执行指定的 runnable,并返回一个新的 completionstage。 |
thenacceptbothasync | 在两个 completionstage 都完成后,异步地执行指定的 biconsumer。 |
accepteitherasync | 在两个 completionstage 中任意一个完成后,异步地执行指定的 consumer,并返回一个新的 completionstage。 |
handleasync | 异步地处理当前阶段的结果或异常,应用给定的 bifunction,并返回一个新的 completionstage。 |
thencomposeasync | 同 thencompose,但异步地应用给定的函数,并返回一个新的 completionstage。 |
thencombineasync | 同 thencombine,但异步地使用给定的 bifunction 合并两个 completionstage 的结果。 |
exceptionally | 如果当前阶段以异常完成,则应用指定的 function 处理该异常,并返回一个新的 completionstage。 |
accepteither | 在两个 completionstage 中任意一个完成后,执行指定的 consumer。 |
thencompose | 将当前阶段的结果应用于一个返回 completionstage 的函数,并返回一个新的 completionstage。 |
handle | 处理当前阶段的结果或异常,应用给定的 bifunction,并返回一个新的 completionstage。 |
thenacceptboth | 在两个 completionstage 都完成后,执行指定的 biconsumer。 |
thenapplyasync | 异步地应用给定的 function,并返回一个新的 completionstage。 |
whencompleteasync | 异步地执行指定的 biconsumer,无论结果如何,并返回一个新的 completionstage。 |
applytoeitherasync | 同 applytoeither,但异步地应用给定的 function,并返回一个新的 completionstage。 |
accepteitherasync | 同 accepteither,但异步地执行指定的 consumer,并返回一个新的 completionstage。 |
runaftereitherasync | 同 runaftereither,但异步地执行指定的 runnable,并返回一个新的 completionstage。 |
thenrunasync | 异步地执行指定的 runnable,并返回一个新的 completionstage,该阶段没有结果。 |
runafterboth | 在两个 completionstage 都完成后,执行指定的 runnable。 |
whencomplete | 在当前阶段完成后,无论结果如何,执行指定的 biconsumer,并返回一个新的 completionstage。 |
thenrunasync | 异步地执行指定的 runnable,并返回一个新的 completionstage,该阶段没有结果。 |
常用方法
方法 | 描述 |
---|---|
supplyasync() | 异步地运行一个带返回值的任务。 |
runasync() | 异步地运行一个无返回值的任务。 |
thenapply() | 当 completablefuture 任务完成时执行某个操作,并返回新的结果。 |
thenaccept() | 当任务完成时执行某个操作,但不返回结果。 |
thenrun() | 当任务完成时执行某个操作,无需返回结果。 |
exceptionally() | 用于处理任务执行中发生的异常。 |
handle() | 处理任务执行中的正常结果或异常结果。 |
allof() | 等待多个 completablefuture 全部完成,返回一个新的 completablefuture。 |
anyof() | 等待多个 completablefuture 中的任意一个完成。 |
completablefuture使用示例
1. 基本异步操作
completablefuture.supplyasync() 和 completablefuture.runasync() 是最常用的启动异步任务的方法。
- supplyasync() 用于执行带返回值的异步任务。
- runasync() 用于执行不带返回值的异步任务。
public static void main(string[] args) throws executionexception, interruptedexception { // 带返回值的异步任务 completablefuture<integer> future = completablefuture.supplyasync(() -> { try { thread.sleep(1000); // 模拟耗时任务 } catch (interruptedexception e) { e.printstacktrace(); } return 42; // 返回结果 }); // 获取异步任务的结果 integer result = future.get(); // 阻塞,直到任务完成 system.out.println("result: " + result); }
2. 任务链式调用
通过 thenapply(), thenaccept(), thenrun() 等方法,可以将多个异步任务串联在一起。
- thenapply() 用于处理任务的返回值。
- thenaccept() 用于消费返回值,但不返回结果。
- thenrun() 用于执行没有返回值的操作。
public static void main(string[] args) throws executionexception, interruptedexception { completablefuture<integer> future = completablefuture.supplyasync(() -> { return 42; // 返回结果 }); // 链式调用,先处理结果,再转换 completablefuture<integer> resultfuture = future .thenapply(value -> value * 2) // 将值乘以2 .thenapply(value -> value + 10); // 再加10 integer result = resultfuture.get(); // 获取最终结果 system.out.println("final result: " + result); // 输出 94 }
3. 多个异步任务组合
使用 thencombine()、thencompose()、allof() 和 anyof() 等方法可以组合多个异步任务,执行复杂的操作。
- thencombine() 用于将两个独立的异步任务的结果合并。
- thencompose() 用于将第一个异步任务的结果作为参数传递给下一个异步任务。
- allof() 用于等待多个异步任务完成,并且不关心每个任务的结果。
- anyof() 用于等待多个异步任务中的任意一个完成。
示例1:组合两个异步任务
public static void main(string[] args) throws executionexception, interruptedexception { completablefuture<integer> future1 = completablefuture.supplyasync(() -> { return 10; }); completablefuture<integer> future2 = completablefuture.supplyasync(() -> { return 20; }); // 合并两个任务的结果 completablefuture<integer> combinedfuture = future1 .thencombine(future2, (result1, result2) -> result1 + result2); // 将两个结果相加 integer result = combinedfuture.get(); // 获取最终结果 system.out.println("combined result: " + result); // 输出 30 }
示例2:使用 allof() 等待多个任务完成
public static void main(string[] args) throws executionexception, interruptedexception { completablefuture<void> future1 = completablefuture.runasync(() -> { try { thread.sleep(1000); system.out.println("task 1 completed"); } catch (interruptedexception e) { e.printstacktrace(); } }); completablefuture<void> future2 = completablefuture.runasync(() -> { try { thread.sleep(1500); system.out.println("task 2 completed"); } catch (interruptedexception e) { e.printstacktrace(); } }); // 等待多个任务全部完成 completablefuture.allof(future1, future2).join(); system.out.println("all tasks are completed."); }
4. 异常处理
在异步任务中,异常可能会发生。completablefuture 提供了 exceptionally() 和 handle() 方法来处理异常。
- exceptionally() 用于捕获异常并提供替代值。
- handle() 可以处理正常结果和异常。
public static void main(string[] args) throws executionexception, interruptedexception { completablefuture<integer> future = completablefuture.supplyasync(() -> { if (true) { throw new runtimeexception("something went wrong!"); } return 42; }); // 使用 exceptionally 处理异常并提供默认值 completablefuture<integer> resultfuture = future.exceptionally(ex -> { system.out.println("exception occurred: " + ex.getmessage()); return -1; // 返回默认值 }); integer result = resultfuture.get(); // 获取结果 system.out.println("result: " + result); // 输出 -1 }
5. 并行执行多个任务
使用 completablefuture.supplyasync() 或 runasync() 来并行执行多个任务,然后使用 allof() 或 anyof() 等方法等待这些任务的完成。
public static void main(string[] args) throws executionexception, interruptedexception { completablefuture<integer> future1 = completablefuture.supplyasync(() -> { try { thread.sleep(1000); return 1; } catch (interruptedexception e) { return 0; } }); completablefuture<integer> future2 = completablefuture.supplyasync(() -> { try { thread.sleep(500); return 2; } catch (interruptedexception e) { return 0; } }); // 等待所有任务完成并合并结果 completablefuture<integer> result = future1 .thencombine(future2, (res1, res2) -> res1 + res2); // 将两个结果相加 system.out.println("combined result: " + result.get()); // 输出 3 }
6. 处理返回值的转换
通过 thenapply() 等方法可以对异步任务的结果进行转换处理。
public static void main(string[] args) throws executionexception, interruptedexception { completablefuture<integer> future = completablefuture.supplyasync(() -> 10); // 转换结果:将值乘以2 completablefuture<integer> transformedfuture = future.thenapply(value -> value * 2); system.out.println("transformed result: " + transformedfuture.get()); // 输出 20 }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论