三种调用模式
completablefuture(以下简称cf)提供了三种调用模式,分别是就地执行、异步使用默认执行器执行、异步指定执行器执行。
就地执行指的是回调在当前线程中执行,调用thenapply、thencompose等方法时,如果当前 cf 已经执行完成,会立即执行回调,称为当前执行,执行时刻为当前(now);如果未完成,则由完成 cf 的线程执行。
如下分别是立即执行和异步执行的例子:
var cf = completablefuture.completed(1); cf.thenapply(x -> x + 1) .thenrun(x -> system.out.println(x)) .join();
以上代码全程同步。
var cf = new completablefuture<integer>(); cf.thenapply(x -> x + 1) .thenrun(x -> system.out.println(x)); new thread(() -> cf.complete(1)).start(); uninterruptible.sleep(1, timeunit.seconds);
thenapply、thenrun在调用时,cf未完成,无法立刻执行,其执行在完成cf的线程,也就是新创建的线程中。
异步执行指的是回调任务的执行必定在执行器中执行,默认执行器为java提供的commonpool线程池,当然也可以通过重写defaultexecutor实现调用指定的线程池。
var cf = completablefuture.completed(1); cf.thenapplyasync(x -> x + 1) .thenrunasync(x -> system.out.println(x)) .join(); uninterruptible.sleep(1, timeunit.seconds);
以上代码中打印操作在公共线程池中执行。
比较
就地执行性能最好,可以完全避免线程上下文切换,适合执行一些轻量级任务。缺点是使用不当时,会阻塞当前线程;可能会造成“线程泄露”,导致线程池中的线程没有及时归还。
异步执行反之。
第 4 种调用模式
线程池中任务执行有一条原则:尽最大努力交付。意思是如果任务提交时没有拒绝,没有抛出拒绝执行等异常,通常来说通过了信号量、限流器、执行时间、线程数等诸多限制,后续的执行应该不作额外限制,且努力完成;而不是等执行过程中再抛出类似拒绝服务等异常。反过来说,如果当前任务提交时,任务不能执行,就应该拒绝执行。这条简单的原则可以避免考虑复杂的问题,比如反压、取消机制等,也能够应对大多数的业务场景。
对于非轻量级任务,例如 a -> b,表示任务a执行完成后执行任务b,常规的线程池实现有一个问题,b任务的提交不一定立即执行,可能遇到排队(进入阻塞队列)甚至超时等情况,最终导致整个任务的滞后。此时如果能就地执行最好。
如果选择就地执行策略,解决了以上问题,但是可能会导致cf已完成后执行的当前线程阻塞。这时最好有执行器执行任务,而不是占用当前线程。
最近cffu类库提供llcf#relayasync0,完美解决了以上痛点。ll表示low level,对于其的正确使用要求开发人员对completablefuture有着充分的理解。relay的含义是接力,这里指的是
- relay async 接力异步
- async 词尾,保证一定是异步(和cf命名表义 一样)
异步时(不阻塞调用逻辑),用前个computation的线程接力执行,不使用新线程,避免了上下文切换开销。
例子
relayasync0 签名如下:
public static <t, f extends completionstage<?>> f relayasync0( completionstage<? extends t> cfthis, function<completablefuture<t>, f> relaycomputations, executor executor)
需要注意传入的回调任务不是普通的function,而是入参cf,出参 completionstage,也就是说我们需要传入对cf的回调。比如:
cf -> cf.thenapply(...) cf -> cf.thencompose(...) cf -> cf.thenrun(...)
该方法使用时和thenapplyasync很像,只不过由实例方法调用改成了静态方法调用,回调参数为对cf的回调。
以下代码引用自cffu作者 李鼎 | jerry lee,详细说明四种调用模式的用法:
public class relayasyncdescriptionandexample { static void executecomputationsofnewstage(completablefuture<string> cf) { // ================================================================================ // default execution // ================================================================================ cf.thenapply(s -> { // a simulating long-running computation... sleep(1000); // if input cf is completed when computations execute, // executes the long time computation synchronously (aka. in the caller thread); // this synchronized execution leads to blocking sequential codes of caller... ⚠️ return s + s; }); // ================================================================================ // asynchronous execution of completablefuture(default executor or custom executor) // ================================================================================ cf.thenapplyasync(s -> { // a simulating long-running computation... sleep(1000); // always executes via an executor(guarantees not to block sequential code of caller). // if input cf is incomplete when computations execute, // the execution via an executor leads to one more thread switching. ⚠️ return s + s; }); // ================================================================================ // how about the fourth way to arrange execution of a new stage's computations? // ================================================================================ // // - if input cf is completed when computations execute, use "asynchronous execution" (via supplied executor), // won't block sequential code of caller ✅ // - otherwise, use "default execution", save one thread switching ✅ // // let's call this way as "relay async". llcf.relayasync0(cf, f -> f.thenapply(s -> { // a simulating long-running computation... sleep(1000); // if input cf is completed, executes via supplied executor // if input cf is incomplete, use "default execution" return s + s; }), forkjoinpool.commonpool()); } }
实现分析
public static <t, f extends completionstage<?>> f relayasync0( completionstage<? extends t> cfthis, function<completablefuture<t>, f> relaycomputations, executor executor) { final completablefuture<t> promise = new completablefuture<>(); final f ret = relaycomputations.apply(promise); final thread callerthread = currentthread(); final boolean[] returnedfrompeek0 = {false}; llcf.peek0(cfthis, (v, ex) -> { if (currentthread().equals(callerthread) && !returnedfrompeek0[0]) { // if the action is running in the caller thread(single same thread) and `peek0` invocation does not // return to caller(flag returnedfrompeek0 is false), the action is being executed synchronously. // to prevent blocking the caller's sequential code, use the supplied executor to complete the promise. executor.execute(() -> completecf0(promise, v, ex)); } else { // otherwise, complete the promise directly, avoiding one thread switching. completecf0(promise, v, ex); } }, "relayasync0"); returnedfrompeek0[0] = true; return ret; }
说明:
- completecf0方法可以将结果v或者异常ex设置到promise中
- peek0 近似等效于 whencomplete
分析:
- 可以通过引入新的cf,也就是 promise 实现线程传递,其他线程“完成”promise时,这个线程隐式传到了promise中,可以理解成隐式上下文。任何一个cf都带有一个隐式上下文。
- returnedfrompeek0 避免了异步调用但是恰好是同线程的问题,此时也应该实现relay语义,因为我们的目的是避免对当前线程的阻塞。returnedfrompeek0 天然线程安全,因为其访问总是在一个确定的线程内。
- else 代码块:就地执行,避免线程切换。
总结
到此这篇关于java中completablefuture四种调用模式的实现的文章就介绍到这了,更多相关java completablefuture 调用模式内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论