0%

CompletableFuture源码学习

了解到CompletableFuture的基础用法之后,我们不禁好奇,以前的Future模式不支持如此好用的异步编程,CompletableFuture是如何做到的呢?这就需要我们去阅读源码了,通过源码我们才能了解到其设计思想和实现方式,我们分析下supplyAsync 和 thenApplyAsync 这两个,并且是提供线程池的接口,因为如果不提供自定义线程池,就会用默认的,如下:

1
2
3
4
5
6
7
8
9
private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);

private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}

上面有一个判断USE_COMMON_POOL,其中用到了ForkJoinPool.getCommonPoolParallelism(),这个是ForkJoin中通用池的并行级别,默认是Runtime.getRuntime().availableProcessors() - 1,所以你的电脑有四核心,那么ForkJoinPool.getCommonPoolParallelism()的值就是3,如果只有一个核心,该值是0,所以USE_COMMON_POOL为true,那么你的电脑至少三个核心。

如果USE_COMMON_POOL为false,那么就会用ThreadPerTaskExecutor,由上面代码可知,这是个单线程的执行器。

在CompletableFuture源码中,有两个成员属性比较重要(volatile保证多线程之间值的可见性),如下所示:

1
2
volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions

其中result为当前阶段的计算结果,注意看上面的注释,还有可能值为AltResult,该类仅有一个成员变量Throwable ex,该类的作用官方描述为:

An AltResult is used to box null as a result, as well as to hold exceptions.

即用AltResult代替null和持有计算过程中发生的异常,源码如下:

1
2
3
4
static final class AltResult { // See above
final Throwable ex; // null only for NIL
AltResult(Throwable x) { this.ex = x; }
}

stack

CompletableFuture内部使用了Treiber stack,Treiber stack算法是属于无锁并发栈,内部使用CAS(compare-and-swap)来实现无锁并发算法。详情请看:Treiber stack设计

supplyAsync源码

测试代码如下:

1
2
3
4
5
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return Thread.currentThread().getName();
}, Executors.newFixedThreadPool(5));

System.out.println(future.join());

我们分析如下supplyAsync实现:

1
CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

该方法会直接调asyncSupplyStage方法,首先先去检查传入的执行器是否为ForkJoinPool.commonPool(),如果是会直接用ForkJoinPool.commonPool(),代码如下所示:

1
2
3
4
5
6
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException(); // 空指针判断
CompletableFuture<U> d = new CompletableFuture<U>(); // 新建CompletableFuture
e.execute(new AsyncSupply<U>(d, f)); // 新建AsyncSupply(ForkJoinTask),丢到传入的线程池执行
return d; // 立即返回
}

asyncSupplyStage方法里首先做空指针判断,接着新建一个新的CompletableFuture, 然后新建一个AsyncSupply,将刚才新建的CompletableFuture和传入的Supplier传给AsyncSupply,接着直接将AsyncSupply丢到传入的线程池中进行执行,最后立即返回,不会等待执行结束。

AsyncSupply源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 该类继承自ForkJoinTask,为ForkJoin计算任务,且实现了Runnable, AsynchronousCompletionTask,
* 代表可以直接丢到线程池里面运行
*/
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<? extends T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
this.dep = dep; this.fn = fn;
}

public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return false; }

public void run() {
CompletableFuture<T> d; Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) { // 判断d, f 是否为空
dep = null; fn = null; // 这里置空,防止多次执行
if (d.result == null) { // 如果当前任务还没执行完,result没有值
try {
d.completeValue(f.get()); // 等待任务结束,并利用CAS设置结果,可能设置失败
} catch (Throwable ex) {
d.completeThrowable(ex); // 出现异常
}
}
d.postComplete(); // 这个后面再说,目前没有用
}
}
}

AsyncSupply内部类里面,有一个run方法,由于我们将其丢到了线程池中运行,所以就会执行run方法。在这个方法里面,会执行我们传给supplyAsync的计算任务,并将结果通过CAS写到