Callable
在Java中我们知道创建一个线程可以继承Thread
类或者实现Runnable
接口,JDK1.5之后在java.util.concurrent
提供了Callable
接口,该接口设计类似Runnable
接口,不过Callable
接口可以返回任务执行的结果,并且在执行任务过程中可能会抛出异常,而Runnable
却不会。下面是Callable
接口的定义:
1 |
|
Callable
接口中只定义了一个call()
方法,该方法会返回一个计算结果,类型与传入的泛型一致。既然是接口,那么在哪里用到呢?下面是一个与FutureTask
结合的例子,代码如下:
1 | public class CallableTest implements Callable<String> { |
我们可以发现将 Callable
的实现类传给FutureTask
,然后利用线程来运行FutureTask
,最终调用get()
方法获取计算结果。
Future
FutureTask
是一个可取消的异步计算,该类提供了Future
的基本实现,那么Future
是怎么回事呢?Future
接口提供了如下方法:
1 | public interface Future<V> { |
Future
表示异步计算的结果,同时提供了用于检查计算是否完成、等待其完成以及检索计算结果的方法。下面是对这些方面的具体描述:
cancel(boolean mayInterruptIfRunning)
:试图取消任务的执行。如果任务已经完成、已被取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且在调用cancel时该任务尚未启动,则该任务永远不会运行。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务。在此方法返回后,对isDone
的后续调用将始终返回true
。如果该方法返回true
,则对isCancelled
的后续调用将始终返回true
。isCancelled
: 如果此任务在正常完成之前被取消,则返回true。isDone
:如果任务完成,返回true。在正常终止、异常或取消情况下导致任务完成,该方法将返回true。get
:等待计算完成,返回计算结果,期间会被阻塞。注意该方法会抛出异常,- CancellationException - 如果计算被取消
- ExecutionException - 如果在计算抛出异常
- InterruptedException - 如果当前线程在等待时被中断
get(long timeout, TimeUnit unit)
:在给定的时间内等待计算完成,然后返回计算结果。注意该方法也会抛出异常:- CancellationException - 如果计算被取消
- ExecutionException - 如果在计算抛出异常
- InterruptedException - 如果当前线程在等待时被中断
- TimeoutException - 等待超时
感觉Future
的API设计的十分简洁明了,定义了对异步计算的常用操作,由于Future
只是接口,刚才提到的FutureTask
是JDK提供的一种实现,所以我们需要了解一下Future
接口的方法是如何实现异步计算并拿到结果的。
FutureTask
FutureTask
的类图如下所示,该类实现了RunnableFuture
接口,RunnableFuture
接口继承自Runnable
和Future
,所以该类既可以交给Thread去执行,又可以作为Future
来获取计算结果。
构造函数及state
打开FutureTask
类的源码,我们首先来看看其构造函数的实现:
1 | public FutureTask(Callable<V> callable) { |
对于第一个构造函数,传入Callable的实现类,将其赋给FutureTask成员变量callable
,同时设置state为NEW
,state字段用来保存FutureTask内部的任务执行状态,一共有7中状态,每种状态及其对应的值如下:
1 | private volatile int state; |
注意state是用volatile
修饰,保证其在线程之间的可见性。在源码注释中,我们可以发现state所代表状态转换如下:
1 | NEW -> COMPLETING -> NORMAL |
用图表示如下:
从图中仿佛可以看出该类通过改变state的状态来反映最后计算的结果。
run
在创建了一个FutureTask实例之后,接下来就是在另一个线程中执行此Task,无论是直接创建Thead还是通过线程池,执行的都是run()
方法,该方法代码如下:
1 | public void run() { |
在run方法中,首先会判断state是否等于NEW
,如果不等于NEW
,说明此任务已经被执行过,或者由于其他原因被取消了,直接返回;
接下来会利用CAS将该类volatile
修饰的runner
成员变量设置为当前线程,注意在设置之前runner
必须为null,设置失败也直接返回。由于我看的版本是JDK11,所以这里的CAS操作用的是JDK9引入的VarHandle
(方法句柄),用来代替UnSafe
类,详情参考:用Variable Handles来替换Unsafe,在FutureTask类中实现代码如下:
1 | // VarHandle mechanics |
检测过state
和runner
后,接着会调用传入的callable的call()
方法,执行任务。如果抛出异常,将结果设置为null
,调用setException()
方法保存异常信息,下面是代码:
1 | protected void setException(Throwable t) { |
在setException
方法中,有以下流程:
- 利用CAS操作将state状态由
NEW
改为COMPLETING
,如果操作成功; - 把异常原因保存在
outcome
字段中,outcome
字段用来保存任务执行结果或者异常原因; - 利用CAS把当前任务状态从
COMPLETING
变更为EXCEPTIONAL
,可以参考上面转换的图; - 调用
finishCompletion()
通知和移除等待线程
如果没发生异常,任务执行结束,调用set(result)
方法设置计算结果,代码如下:
1 | protected void set(V v) { |
我们发现set()
方法实现流程和setException()
真像,只不过是state状态变化的差异,流程如下:
- 利用CAS操作将state状态由
NEW
改为COMPLETING
,如果操作成功; - 把计算结果保存在outcome字段中,outcome字段用来保存任务执行结果或者异常原因;
- 利用CAS把当前任务状态从
COMPLETING
变更为NORMAL
,可以参考上面转换的图; - 调用
finishCompletion()
通知和移除等待线程
计算完后,无论是否发生异常,都要执行finally语句块的方法,首先将runner设置为null
,释放值等待gc回收,同时判断state的状态是否为INTERRUPTING
,如果任务被中断,执行中断处理。
看完了run方法的实现,总结来说,利用CAS根据任务的执行情况更改state的值,其他方法再根据state的值做出相应的处理。
get
由于FutureTask是Future的一个实现,所以它提供了获取计算结果的get()
方法,代码如下:
1 | /** |
FutureTask运行在一个线程里来执行计算任务,由于Future设计的是异步计算模式,那么当然应该考虑其他线程获取计算的结果,从get方法看到,如果state的值如果小于等于COMPLETING
,说明计算任务还没完成,那么获取计算结果的线程必须等待,也就是被阻塞,具体的实现在awaitDone
方法里,该方法有两个参数,第一个参数为是否有超时限制timed,第二个为等待时间nanos,代码如下:
1 | private int awaitDone(boolean timed, long nanos) |
在FutureTask类中有一个成员变量waiters
,声明如下:
1 | /** Treiber stack of waiting threads */ |
WaitNode
是一个静态内部类,数据结构为单链表,用来记录等待的线程,代码如下:
1 | /** |
从上面的代码来看,在awaitDone
方法内部存在着一个死循环,死循环内部流程如下:
- 首先判断state的值,
- 如果值大于
COMPLETING
,代表计算已完成(包括抛出异常等),直接返回; - 如果值等于
COMPLETING
,代表正在执行计算,调用Thread.yield()
让出时间片等待计算完成
- 如果值大于
- 如果当前线程被中断(中断标志位为true),那么从列表中移除节点q,并抛出
InterruptedException
; - 如果当前线程包装的等待节点为空,判断是否设置等待,并且等待时间为0,直接返回,否则创建等待节点;
- 如果没有入队,使用CAS将新节点添加到链表中,如果添加失败,那么queued为false
- 如果设置超时,判断当前计算任务是否在超时时间内,
- 如果不在,移除队列中的结点,直接返回
- 如果在,计算剩余时间,挂起当前线程,让当前线程等待剩下的时间
- 未设置等待时间,直接进行线程挂起操作,线程状态变为等待。
当线程被解除挂起,或计算已经完成后,在get
方法中将会调用report
方法返回结果,其实现如下:
1 | /** |
- 如果state等于
NORMAL
,代表计算正常结束,返回结果; - 如果state等于
CANCELLED
,代表计算被取消,抛出CancellationException
; - 如果计算以异常结束,即状态是
EXCEPTIONAL
,那么抛出ExecutionException
。
finishCompletion
在run
方法中调用set
和setException
时最后一步是执行finishCompletion
方法,那么这个方法是来干什么的呢?我们来看看它的实现吧:
1 | /** |
刚才我们看get
方法的实现时,发现有一个WaitNode
的单链表结构,里面存储着等待着的线程,所以在计算完成时,需要唤醒那些还在等待着的线程,毕竟计算任务都做完了(异常也算结束),总不能让那些阻塞的线程干等着吧,所以在finishCompletion
方法中就遍历单链表,利用CAS将FutureTask中的waiters设置为null
,调用LockSupport.unpark
唤醒线程,当线程被释放后,那么在awaitDone的死循环中就会进入下一个循环,由于状态已经变成了NORMAL
或者EXCEPTIONAL
,将会直接跳出循环。
当所有等待线程都唤醒后,直接调用done
方法,done
方法是个protected
修饰的方法,FutureTask没有做相关实现,所以如果在计算完成后需要特殊处理,子类可以重写done
方法。
cancel
从Future接口的描述来看,它提供了cancel
方法来取消正在执行的任务,FutureTask实现了cancel
方法,我们来看看它的代码吧:
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
参数mayInterruptIfRunning
指明是否应该中断正在运行的任务,
- 如果参数为false,代表不需要中断,那么state的转换过程由
NEW->CANCELLED
- 如果参数为true,代表需要中断,那么state的转换过程将为
NEW->INTERRPUTING->INTERRUPTED
,并给当前线程设中断标志。
无论是否中断,最终都会调用finishCompletion()
方法来释放等待线程。
参考: