当前位置: 代码网 > it编程>操作系统>苹果Mac > JUC中Future及CompletableFuture的用法与说明

JUC中Future及CompletableFuture的用法与说明

2026年04月21日 苹果Mac 我要评论
future 异步回调(不推荐)new thread时传入futuretask对象(构造时传入callable任务对象),调用start启动创建线程的方式1.直接new thread()对象,重写ru

future 异步回调(不推荐)

new thread时传入futuretask对象(构造时传入callable任务对象),调用start启动

创建线程的方式

1.直接new thread()对象,重写run方法,调用start启动

2.new thread时传入runnable任务对象(重写run方法),调用start启动

3.new thread时传入futuretask对象(构造时传入callable任务对象),调用start启动

/**
 * 创建线程的三种方式
 */
public class createthreaddemo {
    public static void main(string[] args) throws exception {
        // 1.重写thread的run方法
        thread t1 = new thread(() -> {
            system.out.println("第一种创建线程方式,重写thread的run方法");
        },"t1");
        // 2.重写runnable的run方法
        runtask runtask = new runtask();
        thread t2 = new thread(runtask, "t2");
        // 3.传入futuretask对象,重写callable的call方法(异步带返回值)
        calltask calltask = new calltask();
        futuretask<string> futuretask = new futuretask<>(calltask);
        thread t3 = new thread(futuretask, "t3");
        t1.start();
        t2.start();
        t3.start();
        system.out.println("异步执行结果" + futuretask.get() + " " + system.currenttimemillis());
    }
}
/**
 * runnable 任务
 */
class runtask implements runnable {
    @override
    public void run() {
        system.out.println("runnable running " + system.currenttimemillis());
    }
}
/**
 * callable 任务
 */
class calltask implements callable<string> {
    @override
    public string call() throws exception {
        system.out.println("callable running " + system.currenttimemillis());
        return "hello";
    }
}

概述

fucture接口(futuretask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完毕等。

future接口可以为主线程开一个分支任务,专门为主线程处理耗时费力的复杂业务。

future提供了一种异步并行计算的功能:如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过future获取计算结果

future接口 --- 源码

public interface future<v> {
    boolean cancel(boolean mayinterruptifrunning);
    boolean iscancelled();
    boolean isdone();
    v get() throws interruptedexception, executionexception;
    v get(long timeout, timeunit unit)
    throws interruptedexception, executionexception, timeoutexception;
}

futuretask(future实现类)

thread类构造方法:

可以看出thread构造方法接收的任务对象只有runnable接口,为什么还能接收futuretask任务对象呢?

因为futuretask实现了runablefuture接口,而runnablefuture接口继承了runnable接口和future接口,所以futuretask也是runnable对象,而futuretask可以接收callable任务对象是因为构造方法中提供了接收callable对象的构造方法。

future编码优缺点

  • 优点:future+线程池可以实现多线程任务配合,能显著提高程序的执行效率
  • 缺点:get()阻塞、isdone()轮询导致cpu空转
  • 阻塞情况: 调用futuretask.get方法时线程会阻塞等待异步结果的获取

代码:

futuretask<string> futuretask = new futuretask<string>(() -> {
    system.out.println("开始执行......" + now().getsecond());
    // 休眠
    try {
        timeunit.seconds.sleep(5);
    } catch (interruptedexception e) {
        e.printstacktrace();
    }
    system.out.println("结束......");
    return "futuretask end";
});
thread t = new thread(futuretask);
t.start();
system.out.println("子线程执行结果:" + futuretask.get());
system.out.println("--------------------------------");
system.out.println("主线程 -------- 执行..... " + now().getsecond());
system.out.println("--------------------------------");

运行结果:

可以通过设置:

system.out.println("子线程执行结果:" + futuretask.get(2, timeunit.seconds));

暴力终止程序运行或抛出异常

isdone()轮询导致cpu空转:

轮询的方式会消耗无谓的cpu资源,而且也不见得能及时获得到计算结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。

while (true) {
    if (futuretask.isdone()) {
        system.out.println("子线程执行结果:" + futuretask.get() + now().getsecond());
        break;
    }else {
        try {
            timeunit.seconds.sleep(2);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        system.out.println("暂停2s");
    }
}

结论:future对于结果的获取不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

由此引入completablefuture,规避以上的缺点

需要说明的是对于简单的业务使用future就可以了

completablefuture(异步任务编排)

completablefuture可以代表一个明确完成的future,也可能代表一个完成阶段(completionstage),它支持在计算完成以后触发一些函数或执行某些动作。

completablefuture可以解决的问题?

多个任务前后依赖组合处理:想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值;将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个计算结果

对计算速度选最快:当future集合中某个任务最快结束时,返回结果,返回第一名的处理结果。。。。

completablefuture对比future的改进

future问题:get()阻塞、isdone()轮询cpu空转

对于真正的异步处理我们希望是可以通过传入回调函数,在future结束时自动调用该回调函数,这样,我们就不用等待结果。

completablefuture提供一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

怎么创建completablefuture对象?

创建

一般不建议直接new completablefuture

可以采用以下的方式创建completablefuture对象:

1.completablefuture.runasync无返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

2. completablefuture.supplyasync有返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

怎么解决future的阻塞和轮询问题?

从java8开始引入了completablefuture,它是future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

默认线程池创建的是守护线程,自定义线程池是用户线程

public static void main(string[] args) throws executionexception, interruptedexception {
    executorservice threadpool = executors.newfixedthreadpool(5);
    // 有返回值
    supplyfuture(threadpool);
    system.out.println("主线程 running " + now());
    threadpool.shutdown();
}
private static void supplyfuture(executorservice threadpool) {
    completablefuture<integer> supplyfuture = completablefuture.supplyasync(() -> {
        system.out.println("开始执行...." +
                (thread.currentthread().isdaemon() ? "守护线程" : "用户线程") +
                " " + now());
        int random = threadlocalrandom.current().nextint();
        try {
            timeunit.seconds.sleep(3);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        return random;
    }, threadpool).whencomplete((v, e) -> {
        if (e == null) {
            // 没有异常
            system.out.println("随机数结果: " + v);
        }
    }).exceptionally((e) -> {
        e.printstacktrace();
        system.out.println("异常情况:" + e.getcause());
        return null;
    });
}

运行结果:

已经解决了future编码出现的get阻塞问题。

上述代码块逻辑解析:

supplyasync()中是任务(默认线程池创建的守护线程),如果这个任务成功会走到whencomplete代码块中,不成功会走到exceptionally代码块中

注意:默认线程池创建的是守护线程,自定义线程池是用户线程,守护线程会随着用户线程的结束而结束,所以会导致主线程执行完了然后还没打印出随机数线程池就关闭了,就是以下输出的情况

怎么获得异步结果?

通过get或者join获得结果(区别在于join在编译期间不会作检查性异常的处理,抛不抛异常都可以)

completablefuture的优点

  1. 异步任务结束时,会自动回调某个对象的方法;
  2. 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行;
  3. 异步任务出错时,会自动回调某个对象的方法:

函数式编程

函数式编程:lambda表达式+stream流式编程+chain链式调用+java8函数式编程

如runnable、function、consumer --- biconsumer、supplier

常用函数式接口

函数式接口名称

方法名称

参数

返回值类型

consumer<t>

accept

t t

void

supplier<t>

get

t

function<t, r>

apply

t t

r

predicate<t>

test

t t

boolean

biconsumer<t, u>

accept

t t, u u

void

bifunction<t, u, r>

apply

t t, u u

r

bipredicate<t, u>

test

t t, u u

boolean

unaryoperator<t>

apply

t t

t

binaryoperator<t>

apply

t t1, t t2

t

intconsumer

accept

int value

void

intsupplier

getasint

int

intfunction<r>

apply

int value

r

intpredicate

test

int value

boolean

链式语法

public class chaindemo {
    public static void main(string[] args) {
        student student = new student();
        student.setid(1).setname("karry").setmajor("cs");
    }
}
@data
@allargsconstructor
@noargsconstructor
@accessors(chain = true)
class student{
    private integer id;
    private string name;
    private string major;
}

案例精讲-电商网站的比价需求

从“功能”到“性能”

/**
 * 电商网站的比价需求
 */
public class completablemalldemo {
    private static final list<mall> list = new arraylist<>();
    static {
        for (int i = 0; i <= 1000; i ++) {
            list.add(new mall("book" + i));
        }
    }
    /**
     * 查询价格 同步处理
     * @param list 平台列表
     * @param bookname 书籍名称
     * @return 查询所需时间
     */
    public static long getprice(list<mall> list, string bookname) {
        long start = system.currenttimemillis();
        list<string> prices = list.stream()
        .map((mall) -> string.format(bookname + " in %s price is %.2f",
                                     mall.getname(), mall.calcprice(bookname)))
        .collect(collectors.tolist());
        long end = system.currenttimemillis();
        system.out.println("用时:  " + end);
        prices.foreach(system.out::println);
        return end - start;
    }
    /**
     * 查询价格 异步处理
     * @param list 平台
     * @param bookname 书籍名称
     * @return 查询所需
     */
    public static long getpricebycompletablefuture(list<mall> list, string bookname) {
        long start = system.currenttimemillis();
        executorservice threadpool = executors.newfixedthreadpool(10);
        list<string> prices = list.stream()
        // 把 mall 映射到 completablefuture对象
        .map(mall -> completablefuture.supplyasync(() ->
                                                   string.format(bookname + " in %s price is %.2f",
                                                                 mall.getname(), mall.calcprice(bookname)), threadpool)
            ).collect(collectors.tolist()).stream()
        // completablefuture对象 映射到 completablefuture.join() [string对象]
        .map(completablefuture::join).collect(collectors.tolist());
        threadpool.shutdown();
        long end = system.currenttimemillis();
        system.out.println("用时:  " + end);
        prices.foreach(system.out::println);
        return end - start;
    }
    public static void main(string[] args) {
        system.out.println("差值:" +
                           (getprice(list, "mysql") - getpricebycompletablefuture(list, "mysql")));
    }
}
@allargsconstructor
@noargsconstructor
@data
class mall {
    /**
     * 电商网站名称
     */
    private string name;
    /**
     * 模拟查询价格
     * @param bookname 书籍名称
     * @return double 价格
     */
    public double calcprice(string bookname) {
        return threadlocalrandom.current().nextdouble(20, 100) + bookname.charat(0);
    }
}

源码分析

public class completablefuture<t> implements future<t>, completionstage<t>  {
}

completablefuture实现了future接口,还拓展了future不具备的completionstage接口

completionstage(完成阶段)

completionstage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段。

一个阶段的计算执行可以时一个fuction,consumer或者runnable。比如:stage.thenapply().thenaccept().thenrun()

completablefuture api

不带async和带async的api区别:

对于completionstage,每个任务阶段带async的任务可以设定线程池,不设定就会使用默认线程池, 而不带async的任务阶段会使用completablefuture设定的线程池。

thenrun和thenrunasync的区别:

  • thenrun使用的supplyasync或者runasync传入的线程池(不传入则使用默认线程池---守护线程)
  • thenrunasync使用的是自己api传入的线程池,不传入则使用默认线程池(守护线程)
/**
 * thenrunasync方法
 * @param threadpool
 */
public static void testrunasyncmethod(executorservice threadpool) {
completablefuture.runasync(() -> {
    system.out.println("step 1 " + thread.currentthread().getname());
}, threadpool).thenrunasync(() -> {
    system.out.println("step 2 " + thread.currentthread().getname());
}).thenrunasync(() -> {
    system.out.println("step 3 " + thread.currentthread().getname());
}, threadpool).thenrunasync(() -> {
    system.out.println("step 4 " + thread.currentthread().getname());
}).thenrunasync(() -> {
    system.out.println("step 5 " + thread.currentthread().getname());
}).join();
}

方法

触发时机

输入参数

返回值类型

核心作用

thenrun

前序任务正常完成后

无(runnable

completablefuture<void>

执行无输入、无输出的后续操作

thenaccept

前序任务正常完成后

前序任务的结果(consumer

completablefuture<void>

消费前序任务的结果,无新输出

thenapply

前序任务正常完成后

前序任务的结果(function

completablefuture<u>

基于前序结果计算新结果,有新输出

handle

前序任务完成(无论成败)

前序结果 + 异常(bifunction

completablefuture<u>

处理前序任务的结果或异常,计算新结果

1. 获得结果和触发计算

获得结果:

  • get() : 不见不散
  • get(long, timunit): 过时不候
  • join() : 功能和get类似,但是在编译期间不抛出受检查异常
  • getnow(valueifabsent): 立即获得当前结果,为空则返回valueifabsent的值
public t getnow(t valueifabsent) {
object r;
return ((r = result) == null) ? valueifabsent : reportjoin(r);
}

触发计算:

complete(t value) :是否打断get方法立即返回括号值(返回括号值,为true;不返回为false)

public boolean complete(t value) {
    boolean triggered = completevalue(value);
    postcomplete();
    return triggered;
}

eg:

public static void main(string[] args) throws interruptedexception {
    executorservice threadpool = executors.newfixedthreadpool(5);
    testcompletemethodbytrue(threadpool);
    testcompletemethodbyfalse(threadpool);
    threadpool.shutdown();
}
public static void testcompletemethodbyfalse(executorservice threadpool) throws interruptedexception {
    completablefuture<string> future = completablefuture.supplyasync(() -> {
        try {
            timeunit.seconds.sleep(3);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        return "get/join返回的结果";
    }, threadpool);
    timeunit.seconds.sleep(4);
    boolean flag = future.complete("设定的值");
    system.out.println("complete方法返回值为 " + flag + ",4s左右获得的值为 " + future.join());
}
public static void testcompletemethodbytrue(executorservice threadpool) throws interruptedexception {
    completablefuture<string> future = completablefuture.supplyasync(() -> {
        try {
            timeunit.seconds.sleep(3);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        return "get/join返回的结果";
    }, threadpool);
    timeunit.seconds.sleep(2);
    boolean flag = future.complete("设定的值");
    system.out.println("complete方法返回值为 " + flag + ",2s左右获得的值为 " + future.join());
}

运行结果:

2. 对计算结果进行处理

2.1 thenapply: 计算结果存在依赖关系,串行化

/**
 * thenapply
 * @param threadpool
 */
public static void testapplymethod(executorservice threadpool) {
stringbuffer str = new stringbuffer();
completablefuture<stringbuffer> future = completablefuture.supplyasync(() -> {
    return str.append("a");
}, threadpool).thenapply(f -> {
    return str.append("b");
}).thenapply(f -> {
    return str.append("c");
}).whencomplete((v, e) -> {
    if (v != null) {
        system.out.println("apply处理后的结果为 " + v);
    }
}).exceptionally(e -> {
    e.printstacktrace();
    return null;
});
}

2.2 handle: 计算结果存在依赖关系,串行化(可以带着)

/**
 * handle方法
 * @param threadpool
 */
public static void testhandlemethod(executorservice threadpool) {
    stringbuffer str = new stringbuffer();
    completablefuture<stringbuffer> future = completablefuture.supplyasync(() -> {
        return str.append("a");
    }, threadpool).handle((f, e) -> {
        if (e == null) {
            return f.append("b");
        }else {
            e.printstacktrace();
            return null;
        }
    }).handle((f, e) -> {
        if (e == null) {
            return f.append("c");
        }else {
            e.printstacktrace();
            return null;
        }
    });
    system.out.println("handle处理后的结果为 " + future.join());
}

3.对计算结果进行消费

接收任务的处理结果,消费处理,无返回结果

thenaccept:

public completablefuture<void> thenaccept(consumer<? super t> action) {
    return uniacceptstage(null, action);
}
public completablefuture<void> thenacceptasync(consumer<? super t> action) {
    return uniacceptstage(asyncpool, action);
}
public completablefuture<void> thenacceptasync(consumer<? super t> action,
                                               executor executor) {
    return uniacceptstage(screenexecutor(executor), action);
}

eg:

/**
 * thenaccept方法(消费型)
 * @param threadpool
 */
public static void testacceptmethod(executorservice threadpool) {
    stringbuffer str = new stringbuffer();
    completablefuture.supplyasync(() -> str.append("abc"), threadpool)
    .thenaccept(r -> system.out.println("accept直接消费" + r)).join();
}

输出:

thenrun:

任务a执行完执行b,并且b不需要a的结果

/**
 * thenrun方法
 * @param threadpool
 */
public static void testrunmethod(executorservice threadpool) {
completablefuture.runasync(() -> {
    system.out.println("step 1");
}, threadpool).thenrun(() -> {
    system.out.println("step 2");
}).thenrun(() -> {
    system.out.println("step 3");
});
}
4.对计算速度进行选用

applytoeither:

对两个future对象选用速度较快的那一个结果

eg:

/**
 * applytoeither 方法
 * @param threadpool
 */
public static void testapplytoeithermethod(executorservice threadpool) {
for (int i = 2; i <= 5; i ++) {
    completablefuture<string> future = getplayfuture(i - 1, threadpool).applytoeither(getplayfuture(i, threadpool), f -> f + " is winner");
    system.out.println(future.join());
}
}
private static completablefuture<string> getplayfuture(int num, executorservice threadpool) {
    return completablefuture.supplyasync(() -> {
        try {
            timeunit.milliseconds.sleep(num * 100);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        return "play" + num;
    });
}

5.对计算结果进行合并

thencombine:

两个completionstage任务都完成后,最终能把两个任务的结果一起提交给thencombine来处理;先完成的先等着,等待其它分支任务。

eg:

/**
 * thencombine 方法
 * @param threadpool
 */
public static void testcombinemethod(executorservice threadpool) {
    completablefuture<integer> future1 = getbranchfuture(1, threadpool);
    completablefuture<integer> future2 = getbranchfuture(2, threadpool);
    completablefuture<integer> combinefuture = future2.thencombine(future1, integer::sum);
    system.out.println("计算结果为 " + combinefuture.join());
}
private static completablefuture<integer> getbranchfuture(int num, executorservice threadpool) {
    return completablefuture.supplyasync(() -> {
        try {
            timeunit.milliseconds.sleep(num * 100);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        return num * 10;
    });
}

输出:计算结果为 30

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com