前言
callable、future和futuretask是jdk1.5,java.util.concurrent包提供的异步框架
这里先讲一下什么是异步?异步是指起多个线程,多个线程之间互不干扰,各自执行各自的任务,在代码中可能书写顺序有先有后,但有可能写在后面的线程会比写在前面的线程先执行任务,异步对应并行的概念,常见的异步操作有线程池、callable、completefuture等。
同步是多线程里针对竞争现象的一个处理,竞争是指同一时刻有多个线程访问临界资源,可能会引发程序执行错误的结果,同步就是保证某一时刻仅能有一个线程访问临界资源,同步对应串行的概念,常见的同步操作有synchronized关键字、lock、线程变量、atomic原子类等。
什么时候需要异步?比如要执行一个任务,该任务执行完后会返回一个结果供其他任务使用,但是该任务很耗时,如果我们把程序设计成串行执行,先执行这个耗时任务,等他结束后再把执行结果给下一个任务使用,这样会耗时,且在这个任务执行期间,其他任务都被阻塞了。那么就可以把程序设计成异步,起一个线程执行这个耗时任务,此外主线程做其他事情,等这个耗时任务执行完毕后,主线程再把结果拿到,使用这个结果继续做其他事情,这样在这个耗时任务执行的过程中,主线程可以去做其他事情而不是等他执行完,这样效率会很高,因此异步编程在提高并发量上使用广泛。
callable接口
先看callable接口的源码:
@functionalinterface public interface callable<v> { /** * computes a result, or throws an exception if unable to do so. * * @return computed result * @throws exception if unable to compute a result */ v call() throws exception; }
首先是注解是函数式接口,意味着可以用lambda表达式更简洁地使用它。callable是个泛型接口,只有一个方法call,该方法返回类型就是传递进来的v类型。call方法还支持抛出异常.
与callable对应的是runnable接口,实现了这两个接口的类都可以当做线程任务递交给线程池执行,runnable接口的源码如下:
@functionalinterface public interface runnable { /** * when an object implementing interface <code>runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * the general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.thread#run() */ public abstract void run(); }
既然实现了这两个接口的类都可以当做线程任务,那么这两个接口有什么区别呢?
- runnable接口是java1.1就有的,callable接口是java1.5才有的,可以认为callable接口是升级版的runnable接口;
- runnable接口里线程任务是在run方法里写的,callable接口里线程任务是在call方法里写;
- callable接口的任务执行后会有返回值,runnable接口的任务无返回值(void);
- callable接口的call方法支持抛出异常,runnable接口的run方法不可以;
- 加入线程池运行,runnable使用executorservice的execute方法,callable使用executorservice的submit方法;
- 运行callable任务可以拿到一个future对象,表示异步计算的结果。future对象封装了检查计算是否完成、检索计算的结果的方法,而runnable接口没有。
callable使用executorservice的submit方法,这里看一下executorservice接口里的submit方法的重载情况:
<t> future<t> submit(callable<t> task); <t> future<t> submit(runnable task, t result); future<?> submit(runnable task);
常用的是第一个和第三个,这两个方法分别提交实现了callable接口的类和实现了runnable接口的类作为线程任务,返回异步计算结果future,future里面封装了一些实用方法可以对异步计算结果进行进一步处理。
future接口
future接口代表异步计算的结果,通过future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。future接口的定义如下:
public interface future<v> { boolean cancel(boolean mayinterruptifrunning); boolean iscancelled(); boolean isdone(); v get() throws interruptedexception, executionexception; v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception; }
下面对这五个方法介绍:
- cancel():用来取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。如果任务还没有被执行,则会返回true并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayinterruptifrunning为true,则会立即中断执行任务的线程并返回true,若mayinterruptifrunning为false,则会返回true且不会中断任务执行线程。
- iscanceled():判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
- isdone():判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
- get():获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出cancellationexception异常,如果任务执行过程发生异常则会抛出executionexception异常,如果阻塞等待过程中被中断则会抛出interruptedexception异常。
- get(long timeout,timeunit unit):带超时时间的get()版本,如果阻塞等待过程中超时则会抛出timeoutexception异常。
注意这里两个get方法都会抛出异常。
futuretask
future是一个接口,而futuretask 为 future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runandreset执行计算)。futuretask 常用来封装 callable 和 runnable,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。futuretask 的线程安全由cas来保证。
源码如下:
public class futuretask<v> implements runnablefuture<v> { ... }
futuretask类实现的是runnablefuture <v>
接口,该接口的源码如下:
public interface runnablefuture<v> extends runnable, future<v> { /** * sets this future to the result of its computation * unless it has been cancelled. */ void run(); }
该接口继承了runnable接口和future接口,因此futuretask类既可以当做线程任务递交给线程池执行,又能当callable任务的计算结果。
future vs futuretask
future与futuretask的区别:
- future是一个接口,futuretask是一个实现类;
- 使用future初始化一个异步任务结果一般需要搭配线程池的submit,且submit方法有返回值;而初始化一个futuretask对象需要传入一个实现了callable接口的类的对象,直接将futuretask对象submit给线程池,无返回值;
- future + callable获取结果需要future对象的get,而futuretask获取结果直接用futuretask对象的get方法即可。
使用示例
callable + future
实现callable接口创建一个异步任务的类,在主线程中起一个线程池执行异步任务,然后在主线程里拿到异步任务的返回结果。
import java.util.concurrent.*; public class asyndemo { public static void main(string[] args) { // 初始化线程池 executorservice threadpool = executors.newfixedthreadpool(5); // 初始化线程任务 mytask mytask = new mytask(5); // 向线程池递交线程任务 future<integer> result = threadpool.submit(mytask); // 主线程休眠2秒,模拟主线程在做其他的事情 try { thread.sleep(2000); } catch (interruptedexception e) { e.printstacktrace(); } // 主线程获取异步任务的执行结果 try { system.out.println("异步线程任务执行结果 " + result.get()); system.out.println("检查异步线程任务是否执行完毕 " + result.isdone()); } catch (interruptedexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } } // 静态内部类,实现callable接口的任务类 static class mytask implements callable<integer> { private int num; public mytask(int num) { this.num = num; } @override public integer call() throws exception { for (int i = 1; i < 10; ++i) { this.num *= i; } return this.num; } } }
执行结果如下:
异步线程任务执行结果 1814400
检查异步线程任务是否执行完毕 true
callable + futuretask
要做的事情跟4.1一样。
import java.util.concurrent.*; public class futuretaskdemo { public static void main(string[] args) { // 初始化线程池 executorservice threadpool = executors.newfixedthreadpool(5); // 初始化mytask实例 mytask mytask = new mytask(5); // 用mytask的实例对象初始化一个futuretask对象 futuretask<integer> myfuturetask = new futuretask<>(mytask); // 向线程池递交线程任务 threadpool.submit(myfuturetask); // 关闭线程池 threadpool.shutdown(); // 主线程休眠2秒,模拟主线程在做其他的事情 try { thread.sleep(2000); } catch (interruptedexception e) { e.printstacktrace(); } // 主线程获取异步任务的执行结果 try { system.out.println("异步线程任务执行结果 " + myfuturetask.get()); system.out.println("检查异步线程任务是否执行完毕 " + myfuturetask.isdone()); } catch (interruptedexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } } // 静态内部类,实现callable接口的任务类 static class mytask implements callable<integer> { private int num; public mytask(int num) { this.num = num; } @override public integer call() throws exception { system.out.println(thread.currentthread().getname() + " 正在执行异步任务"); for (int i = 1; i < 10; ++i) { this.num *= i; } return this.num; } } }
执行结果与4.1一样。
底层源码解析
futuretask类关系
可以看到,futuretask实现了runnablefuture接口,则runnablefuture接口继承了runnable接口和future接口,所以futuretask既能当做一个runnable直接被thread执行,也能作为future用来得到callable的计算结果。
核心属性
//内部持有的callable任务,运行完毕后置空 private callable<v> callable; //从get()中返回的结果或抛出的异常 private object outcome; // non-volatile, protected by state reads/writes //运行callable的线程 private volatile thread runner; //使用treiber栈保存等待线程 private volatile waitnode waiters; //任务状态 private volatile int state; private static final int new = 0; private static final int completing = 1; private static final int normal = 2; private static final int exceptional = 3; private static final int cancelled = 4; private static final int interrupting = 5; private static final int interrupted = 6;
其中需要注意的是state是volatile类型的,也就是说只要有任何一个线程修改了这个变量,那么其他所有的线程都会知道最新的值。7种状态具体表示:
- new:表示是个新的任务或者还没被执行完的任务。这是初始状态。
- completing:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从new变更到completing。但是这个状态会时间会比较短,属于中间状态。
- normal:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从completing转换到normal。这是一个最终态。
- exceptional:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从completing转换到exceptional。这是一个最终态。
- cancelled:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从new转化为cancelled状态。这是一个最终态。
- interrupting: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从new转化为interrupting。这是一个中间状态。
- interrupted:调用interrupt()中断任务执行线程之后状态会从interrupting转换到interrupted。这是一个最终态。 有一点需要注意的是,所有值大于completing的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。
各个状态之间的可能转换关系如下图所示:
构造函数
futuretask(callable<v> callable)
public futuretask(callable<v> callable) { if (callable == null) throw new nullpointerexception(); this.callable = callable; this.state = new; // ensure visibility of callable }
这个构造函数会把传入的callable变量保存在this.callable字段中,该字段定义为private callable<v> callable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为new。
futuretask(runnable runnable, v result)
public futuretask(runnable runnable, v result) { this.callable = executors.callable(runnable, result); this.state = new; // ensure visibility of callable }
这个构造函数会把传入的runnable封装成一个callable对象保存在callable字段中,同时如果任务执行成功的话就会返回传入的result。这种情况下如果不需要返回值的话可以传入一个null。
顺带看下executors.callable()这个方法,这个方法的功能是把runnable转换成callable,代码如下:
public static <t> callable<t> callable(runnable task, t result) { if (task == null) throw new nullpointerexception(); return new runnableadapter<t>(task, result); }
可以看到这里采用的是适配器模式,调用runnableadapter<t>(task, result)方法来适配,实现如下:
static final class runnableadapter<t> implements callable<t> { final runnable task; final t result; runnableadapter(runnable task, t result) { this.task = task; this.result = result; } public t call() { task.run(); return result; } }
这个适配器很简单,就是简单的实现了callable接口,在call()实现中调用runnable.run()方法,然后把传入的result作为任务的结果返回。
在new了一个futuretask对象之后,接下来就是在另一个线程中执行这个task,无论是通过直接new一个thread还是通过线程池,执行的都是run()方法,接下来就看看run()方法的实现。
核心方法 - run()
public void run() { //新建任务,cas替换runner为当前线程 if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) return; try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call(); ran = true; } catch (throwable ex) { result = null; ran = false; setexception(ex); } if (ran) set(result);//设置执行结果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s);//处理中断逻辑 } }
说明:
运行任务,如果任务状态为new状态,则利用cas修改为当前线程。执行完毕调用set(result)方法设置执行结果。set(result)源码如下:
protected void set(v v) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { outcome = v; unsafe.putorderedint(this, stateoffset, normal); // final state finishcompletion();//执行完毕,唤醒等待线程 } }
首先利用cas修改state状态为completing,设置返回结果,然后使用 lazyset(unsafe.putorderedint)的方式设置state状态为normal。结果设置完毕后,调用finishcompletion()方法唤醒等待线程,源码如下:
private void finishcompletion() { // assert state > completing; for (waitnode q; (q = waiters) != null;) { if (unsafe.compareandswapobject(this, waitersoffset, q, null)) {//移除等待线程 for (;;) {//自旋遍历等待线程 thread t = q.thread; if (t != null) { q.thread = null; locksupport.unpark(t);//唤醒等待线程 } waitnode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //任务完成后调用函数,自定义扩展 done(); callable = null; // to reduce footprint }
回到run方法,如果在 run 期间被中断,此时需要调用handlepossiblecancellationinterrupt方法来处理中断逻辑,确保任何中断(例如cancel(true))只停留在当前run或runandreset的任务中,源码如下:
private void handlepossiblecancellationinterrupt(int s) { //在中断者中断线程之前可能会延迟,所以我们只需要让出cpu时间片自旋等待 if (s == interrupting) while (state == interrupting) thread.yield(); // wait out pending interrupt }
核心方法 - get()
//获取执行结果 public v get() throws interruptedexception, executionexception { int s = state; if (s <= completing) s = awaitdone(false, 0l); return report(s); }
说明:futuretask 通过get()方法获取任务执行结果。如果任务处于未完成的状态(state <= completing),就调用awaitdone方法(后面单独讲解)等待任务完成。任务完成后,通过report方法获取执行结果或抛出执行期间的异常。report源码如下:
//返回执行结果或抛出异常 private v report(int s) throws executionexception { object x = outcome; if (s == normal) return (v)x; if (s >= cancelled) throw new cancellationexception(); throw new executionexception((throwable)x); }
核心方法 - awaitdone(boolean timed, long nanos)
private int awaitdone(boolean timed, long nanos) throws interruptedexception { final long deadline = timed ? system.nanotime() + nanos : 0l; waitnode q = null; boolean queued = false; for (;;) {//自旋 if (thread.interrupted()) {//获取并清除中断状态 removewaiter(q);//移除等待waitnode throw new interruptedexception(); } int s = state; if (s > completing) { if (q != null) q.thread = null;//置空等待节点的线程 return s; } else if (s == completing) // cannot time out yet thread.yield(); else if (q == null) q = new waitnode(); else if (!queued) //cas修改waiter queued = unsafe.compareandswapobject(this, waitersoffset, q.next = waiters, q); else if (timed) { nanos = deadline - system.nanotime(); if (nanos <= 0l) { removewaiter(q);//超时,移除等待节点 return state; } locksupport.parknanos(this, nanos);//阻塞当前线程 } else locksupport.park(this);//阻塞当前线程 } }
说明:awaitdone用于等待任务完成,或任务因为中断或超时而终止。返回任务的完成状态。函数执行逻辑如下:
如果线程被中断,首先清除中断状态,调用removewaiter移除等待节点,然后抛出interruptedexception。removewaiter源码如下:
private void removewaiter(waitnode node) { if (node != null) { node.thread = null;//首先置空线程 retry: for (;;) { // restart on removewaiter race //依次遍历查找 for (waitnode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!unsafe.compareandswapobject(this, waitersoffset,q, s)) //cas替换 continue retry; } break; } } }
- 如果当前状态为结束状态(state>completing),则根据需要置空等待节点的线程,并返回 future 状态;
- 如果当前状态为正在完成(completing),说明此时 future 还不能做出超时动作,为任务让出cpu执行时间片;
- 如果state为new,先新建一个waitnode,然后cas修改当前waiters;
- 如果等待超时,则调用removewaiter移除等待节点,返回任务状态;如果设置了超时时间但是尚未超时,则park阻塞当前线程;
- 其他情况直接阻塞当前线程。
核心方法 - cancel(boolean mayinterruptifrunning)
public boolean cancel(boolean mayinterruptifrunning) { //如果当前future状态为new,根据参数修改future状态为interrupting或cancelled if (!(state == new && unsafe.compareandswapint(this, stateoffset, new, mayinterruptifrunning ? interrupting : cancelled))) return false; try { // in case call to interrupt throws exception if (mayinterruptifrunning) {//可以在运行时中断 try { thread t = runner; if (t != null) t.interrupt(); } finally { // final state unsafe.putorderedint(this, stateoffset, interrupted); } } } finally { finishcompletion();//移除并唤醒所有等待线程 } return true; }
说明:尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。
- 如果当前future状态为new,根据参数修改future状态为interrupting或cancelled。
- 如果当前状态不为new,则根据参数mayinterruptifrunning决定是否在任务运行中也可以中断。中断操作完成后,调用finishcompletion移除并唤醒所有等待线程
到此这篇关于java中的futuretask:用法和原理 的文章就介绍到这了,更多相关java futuretask 用法内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论