future 异步回调(不推荐)
new thread时传入futuretask对象(构造时传入callable任务对象),调用start启动
创建线程的方式
1.直接new thread()对象,重写run方法,调用start启动
2.new thread时传入runnable任务对象(重写run方法),调用start启动
3.new thread时传入futuretask对象(构造时传入callable任务对象),调用start启动
/**
* 创建线程的三种方式
*/
public class createthreaddemo {
public static void main(string[] args) throws exception {
// 1.重写thread的run方法
thread t1 = new thread(() -> {
system.out.println("第一种创建线程方式,重写thread的run方法");
},"t1");
// 2.重写runnable的run方法
runtask runtask = new runtask();
thread t2 = new thread(runtask, "t2");
// 3.传入futuretask对象,重写callable的call方法(异步带返回值)
calltask calltask = new calltask();
futuretask<string> futuretask = new futuretask<>(calltask);
thread t3 = new thread(futuretask, "t3");
t1.start();
t2.start();
t3.start();
system.out.println("异步执行结果" + futuretask.get() + " " + system.currenttimemillis());
}
}
/**
* runnable 任务
*/
class runtask implements runnable {
@override
public void run() {
system.out.println("runnable running " + system.currenttimemillis());
}
}
/**
* callable 任务
*/
class calltask implements callable<string> {
@override
public string call() throws exception {
system.out.println("callable running " + system.currenttimemillis());
return "hello";
}
}概述
fucture接口(futuretask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完毕等。
future接口可以为主线程开一个分支任务,专门为主线程处理耗时费力的复杂业务。
future提供了一种异步并行计算的功能:如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过future获取计算结果。
future接口 --- 源码
public interface future<v> {
boolean cancel(boolean mayinterruptifrunning);
boolean iscancelled();
boolean isdone();
v get() throws interruptedexception, executionexception;
v get(long timeout, timeunit unit)
throws interruptedexception, executionexception, timeoutexception;
}futuretask(future实现类)

thread类构造方法:

可以看出thread构造方法接收的任务对象只有runnable接口,为什么还能接收futuretask任务对象呢?
因为futuretask实现了runablefuture接口,而runnablefuture接口继承了runnable接口和future接口,所以futuretask也是runnable对象,而futuretask可以接收callable任务对象是因为构造方法中提供了接收callable对象的构造方法。

future编码优缺点
- 优点:future+线程池可以实现多线程任务配合,能显著提高程序的执行效率
- 缺点:get()阻塞、isdone()轮询导致cpu空转
- 阻塞情况: 调用futuretask.get方法时线程会阻塞等待异步结果的获取
代码:
futuretask<string> futuretask = new futuretask<string>(() -> {
system.out.println("开始执行......" + now().getsecond());
// 休眠
try {
timeunit.seconds.sleep(5);
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println("结束......");
return "futuretask end";
});
thread t = new thread(futuretask);
t.start();
system.out.println("子线程执行结果:" + futuretask.get());
system.out.println("--------------------------------");
system.out.println("主线程 -------- 执行..... " + now().getsecond());
system.out.println("--------------------------------");运行结果:

可以通过设置:
system.out.println("子线程执行结果:" + futuretask.get(2, timeunit.seconds));
暴力终止程序运行或抛出异常
isdone()轮询导致cpu空转:
轮询的方式会消耗无谓的cpu资源,而且也不见得能及时获得到计算结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。
while (true) {
if (futuretask.isdone()) {
system.out.println("子线程执行结果:" + futuretask.get() + now().getsecond());
break;
}else {
try {
timeunit.seconds.sleep(2);
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println("暂停2s");
}
}结论:future对于结果的获取不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。
由此引入completablefuture,规避以上的缺点
需要说明的是对于简单的业务使用future就可以了
completablefuture(异步任务编排)
completablefuture可以代表一个明确完成的future,也可能代表一个完成阶段(completionstage),它支持在计算完成以后触发一些函数或执行某些动作。
completablefuture可以解决的问题?
多个任务前后依赖组合处理:想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值;将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个计算结果
对计算速度选最快:当future集合中某个任务最快结束时,返回结果,返回第一名的处理结果。。。。
completablefuture对比future的改进
future问题:get()阻塞、isdone()轮询cpu空转
对于真正的异步处理我们希望是可以通过传入回调函数,在future结束时自动调用该回调函数,这样,我们就不用等待结果。
completablefuture提供一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
怎么创建completablefuture对象?
创建
一般不建议直接new completablefuture
可以采用以下的方式创建completablefuture对象:
1.completablefuture.runasync无返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

2. completablefuture.supplyasync有返回值(指定线程池会用指定的线程池,没有就会使用默认的线程池)

怎么解决future的阻塞和轮询问题?
从java8开始引入了completablefuture,它是future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
默认线程池创建的是守护线程,自定义线程池是用户线程
public static void main(string[] args) throws executionexception, interruptedexception {
executorservice threadpool = executors.newfixedthreadpool(5);
// 有返回值
supplyfuture(threadpool);
system.out.println("主线程 running " + now());
threadpool.shutdown();
}
private static void supplyfuture(executorservice threadpool) {
completablefuture<integer> supplyfuture = completablefuture.supplyasync(() -> {
system.out.println("开始执行...." +
(thread.currentthread().isdaemon() ? "守护线程" : "用户线程") +
" " + now());
int random = threadlocalrandom.current().nextint();
try {
timeunit.seconds.sleep(3);
} catch (interruptedexception e) {
e.printstacktrace();
}
return random;
}, threadpool).whencomplete((v, e) -> {
if (e == null) {
// 没有异常
system.out.println("随机数结果: " + v);
}
}).exceptionally((e) -> {
e.printstacktrace();
system.out.println("异常情况:" + e.getcause());
return null;
});
}运行结果:

已经解决了future编码出现的get阻塞问题。
上述代码块逻辑解析:
supplyasync()中是任务(默认线程池创建的守护线程),如果这个任务成功会走到whencomplete代码块中,不成功会走到exceptionally代码块中
注意:默认线程池创建的是守护线程,自定义线程池是用户线程,守护线程会随着用户线程的结束而结束,所以会导致主线程执行完了然后还没打印出随机数线程池就关闭了,就是以下输出的情况

怎么获得异步结果?
通过get或者join获得结果(区别在于join在编译期间不会作检查性异常的处理,抛不抛异常都可以)
completablefuture的优点
- 异步任务结束时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行;
- 异步任务出错时,会自动回调某个对象的方法:
函数式编程
函数式编程:lambda表达式+stream流式编程+chain链式调用+java8函数式编程
如runnable、function、consumer --- biconsumer、supplier
常用函数式接口
函数式接口名称 | 方法名称 | 参数 | 返回值类型 |
consumer<t> | accept | t t | void |
supplier<t> | get | 无 | t |
function<t, r> | apply | t t | r |
predicate<t> | test | t t | boolean |
biconsumer<t, u> | accept | t t, u u | void |
bifunction<t, u, r> | apply | t t, u u | r |
bipredicate<t, u> | test | t t, u u | boolean |
unaryoperator<t> | apply | t t | t |
binaryoperator<t> | apply | t t1, t t2 | t |
intconsumer | accept | int value | void |
intsupplier | getasint | 无 | int |
intfunction<r> | apply | int value | r |
intpredicate | test | int value | boolean |
链式语法
public class chaindemo {
public static void main(string[] args) {
student student = new student();
student.setid(1).setname("karry").setmajor("cs");
}
}
@data
@allargsconstructor
@noargsconstructor
@accessors(chain = true)
class student{
private integer id;
private string name;
private string major;
}案例精讲-电商网站的比价需求
从“功能”到“性能”

/**
* 电商网站的比价需求
*/
public class completablemalldemo {
private static final list<mall> list = new arraylist<>();
static {
for (int i = 0; i <= 1000; i ++) {
list.add(new mall("book" + i));
}
}
/**
* 查询价格 同步处理
* @param list 平台列表
* @param bookname 书籍名称
* @return 查询所需时间
*/
public static long getprice(list<mall> list, string bookname) {
long start = system.currenttimemillis();
list<string> prices = list.stream()
.map((mall) -> string.format(bookname + " in %s price is %.2f",
mall.getname(), mall.calcprice(bookname)))
.collect(collectors.tolist());
long end = system.currenttimemillis();
system.out.println("用时: " + end);
prices.foreach(system.out::println);
return end - start;
}
/**
* 查询价格 异步处理
* @param list 平台
* @param bookname 书籍名称
* @return 查询所需
*/
public static long getpricebycompletablefuture(list<mall> list, string bookname) {
long start = system.currenttimemillis();
executorservice threadpool = executors.newfixedthreadpool(10);
list<string> prices = list.stream()
// 把 mall 映射到 completablefuture对象
.map(mall -> completablefuture.supplyasync(() ->
string.format(bookname + " in %s price is %.2f",
mall.getname(), mall.calcprice(bookname)), threadpool)
).collect(collectors.tolist()).stream()
// completablefuture对象 映射到 completablefuture.join() [string对象]
.map(completablefuture::join).collect(collectors.tolist());
threadpool.shutdown();
long end = system.currenttimemillis();
system.out.println("用时: " + end);
prices.foreach(system.out::println);
return end - start;
}
public static void main(string[] args) {
system.out.println("差值:" +
(getprice(list, "mysql") - getpricebycompletablefuture(list, "mysql")));
}
}
@allargsconstructor
@noargsconstructor
@data
class mall {
/**
* 电商网站名称
*/
private string name;
/**
* 模拟查询价格
* @param bookname 书籍名称
* @return double 价格
*/
public double calcprice(string bookname) {
return threadlocalrandom.current().nextdouble(20, 100) + bookname.charat(0);
}
}源码分析
public class completablefuture<t> implements future<t>, completionstage<t> {
}completablefuture实现了future接口,还拓展了future不具备的completionstage接口
completionstage(完成阶段)
completionstage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段。
一个阶段的计算执行可以时一个fuction,consumer或者runnable。比如:stage.thenapply().thenaccept().thenrun()
completablefuture api
不带async和带async的api区别:
对于completionstage,每个任务阶段带async的任务可以设定线程池,不设定就会使用默认线程池, 而不带async的任务阶段会使用completablefuture设定的线程池。
thenrun和thenrunasync的区别:
thenrun使用的supplyasync或者runasync传入的线程池(不传入则使用默认线程池---守护线程)thenrunasync使用的是自己api传入的线程池,不传入则使用默认线程池(守护线程)
/**
* thenrunasync方法
* @param threadpool
*/
public static void testrunasyncmethod(executorservice threadpool) {
completablefuture.runasync(() -> {
system.out.println("step 1 " + thread.currentthread().getname());
}, threadpool).thenrunasync(() -> {
system.out.println("step 2 " + thread.currentthread().getname());
}).thenrunasync(() -> {
system.out.println("step 3 " + thread.currentthread().getname());
}, threadpool).thenrunasync(() -> {
system.out.println("step 4 " + thread.currentthread().getname());
}).thenrunasync(() -> {
system.out.println("step 5 " + thread.currentthread().getname());
}).join();
}
方法 | 触发时机 | 输入参数 | 返回值类型 | 核心作用 |
thenrun | 前序任务正常完成后 | 无(runnable ) | completablefuture<void> | 执行无输入、无输出的后续操作 |
thenaccept | 前序任务正常完成后 | 前序任务的结果(consumer ) | completablefuture<void> | 消费前序任务的结果,无新输出 |
thenapply | 前序任务正常完成后 | 前序任务的结果(function ) | completablefuture<u> | 基于前序结果计算新结果,有新输出 |
handle | 前序任务完成(无论成败) | 前序结果 + 异常(bifunction ) | completablefuture<u> | 处理前序任务的结果或异常,计算新结果 |
1. 获得结果和触发计算
获得结果:

- get() : 不见不散
- get(long, timunit): 过时不候
- join() : 功能和get类似,但是在编译期间不抛出受检查异常
- getnow(valueifabsent): 立即获得当前结果,为空则返回valueifabsent的值
public t getnow(t valueifabsent) {
object r;
return ((r = result) == null) ? valueifabsent : reportjoin(r);
}触发计算:

complete(t value) :是否打断get方法立即返回括号值(返回括号值,为true;不返回为false)
public boolean complete(t value) {
boolean triggered = completevalue(value);
postcomplete();
return triggered;
}eg:
public static void main(string[] args) throws interruptedexception {
executorservice threadpool = executors.newfixedthreadpool(5);
testcompletemethodbytrue(threadpool);
testcompletemethodbyfalse(threadpool);
threadpool.shutdown();
}
public static void testcompletemethodbyfalse(executorservice threadpool) throws interruptedexception {
completablefuture<string> future = completablefuture.supplyasync(() -> {
try {
timeunit.seconds.sleep(3);
} catch (interruptedexception e) {
e.printstacktrace();
}
return "get/join返回的结果";
}, threadpool);
timeunit.seconds.sleep(4);
boolean flag = future.complete("设定的值");
system.out.println("complete方法返回值为 " + flag + ",4s左右获得的值为 " + future.join());
}
public static void testcompletemethodbytrue(executorservice threadpool) throws interruptedexception {
completablefuture<string> future = completablefuture.supplyasync(() -> {
try {
timeunit.seconds.sleep(3);
} catch (interruptedexception e) {
e.printstacktrace();
}
return "get/join返回的结果";
}, threadpool);
timeunit.seconds.sleep(2);
boolean flag = future.complete("设定的值");
system.out.println("complete方法返回值为 " + flag + ",2s左右获得的值为 " + future.join());
}运行结果:

2. 对计算结果进行处理
2.1 thenapply: 计算结果存在依赖关系,串行化

/**
* thenapply
* @param threadpool
*/
public static void testapplymethod(executorservice threadpool) {
stringbuffer str = new stringbuffer();
completablefuture<stringbuffer> future = completablefuture.supplyasync(() -> {
return str.append("a");
}, threadpool).thenapply(f -> {
return str.append("b");
}).thenapply(f -> {
return str.append("c");
}).whencomplete((v, e) -> {
if (v != null) {
system.out.println("apply处理后的结果为 " + v);
}
}).exceptionally(e -> {
e.printstacktrace();
return null;
});
}2.2 handle: 计算结果存在依赖关系,串行化(可以带着)

/**
* handle方法
* @param threadpool
*/
public static void testhandlemethod(executorservice threadpool) {
stringbuffer str = new stringbuffer();
completablefuture<stringbuffer> future = completablefuture.supplyasync(() -> {
return str.append("a");
}, threadpool).handle((f, e) -> {
if (e == null) {
return f.append("b");
}else {
e.printstacktrace();
return null;
}
}).handle((f, e) -> {
if (e == null) {
return f.append("c");
}else {
e.printstacktrace();
return null;
}
});
system.out.println("handle处理后的结果为 " + future.join());
}
3.对计算结果进行消费
接收任务的处理结果,消费处理,无返回结果
thenaccept:

public completablefuture<void> thenaccept(consumer<? super t> action) {
return uniacceptstage(null, action);
}
public completablefuture<void> thenacceptasync(consumer<? super t> action) {
return uniacceptstage(asyncpool, action);
}
public completablefuture<void> thenacceptasync(consumer<? super t> action,
executor executor) {
return uniacceptstage(screenexecutor(executor), action);
}eg:
/**
* thenaccept方法(消费型)
* @param threadpool
*/
public static void testacceptmethod(executorservice threadpool) {
stringbuffer str = new stringbuffer();
completablefuture.supplyasync(() -> str.append("abc"), threadpool)
.thenaccept(r -> system.out.println("accept直接消费" + r)).join();
}输出:

thenrun:
任务a执行完执行b,并且b不需要a的结果

/**
* thenrun方法
* @param threadpool
*/
public static void testrunmethod(executorservice threadpool) {
completablefuture.runasync(() -> {
system.out.println("step 1");
}, threadpool).thenrun(() -> {
system.out.println("step 2");
}).thenrun(() -> {
system.out.println("step 3");
});
}4.对计算速度进行选用
applytoeither:
对两个future对象选用速度较快的那一个结果

eg:
/**
* applytoeither 方法
* @param threadpool
*/
public static void testapplytoeithermethod(executorservice threadpool) {
for (int i = 2; i <= 5; i ++) {
completablefuture<string> future = getplayfuture(i - 1, threadpool).applytoeither(getplayfuture(i, threadpool), f -> f + " is winner");
system.out.println(future.join());
}
}
private static completablefuture<string> getplayfuture(int num, executorservice threadpool) {
return completablefuture.supplyasync(() -> {
try {
timeunit.milliseconds.sleep(num * 100);
} catch (interruptedexception e) {
e.printstacktrace();
}
return "play" + num;
});
}
5.对计算结果进行合并
thencombine:
两个completionstage任务都完成后,最终能把两个任务的结果一起提交给thencombine来处理;先完成的先等着,等待其它分支任务。

eg:
/**
* thencombine 方法
* @param threadpool
*/
public static void testcombinemethod(executorservice threadpool) {
completablefuture<integer> future1 = getbranchfuture(1, threadpool);
completablefuture<integer> future2 = getbranchfuture(2, threadpool);
completablefuture<integer> combinefuture = future2.thencombine(future1, integer::sum);
system.out.println("计算结果为 " + combinefuture.join());
}
private static completablefuture<integer> getbranchfuture(int num, executorservice threadpool) {
return completablefuture.supplyasync(() -> {
try {
timeunit.milliseconds.sleep(num * 100);
} catch (interruptedexception e) {
e.printstacktrace();
}
return num * 10;
});
}输出:计算结果为 30
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论