0%

CompletableFuture与异步编程设计

用过Spring推出的Reactor框架的同学可能会感叹异步编程的便利,不过Reactor对于异步编程的初学者来说有点复杂了,看其源码也不是那么容易,那么JDK有没有对异步编程相关的支持呢?Future想必大家都很熟悉(不了解的同学请查看Callable&Future及FutureTask实现分析(JDK11)),在线程池里面运行,返回Future,等待计算结果,虽然采用线程池来运行代码,但像Reactor那样链式编程是不容易的。因为Future的结果是需要调用者自己去拿的,计算没有结束就会一直被阻塞,试想下有没有这样一种设计呢?当计算完之后,主动将计算结果推给调用者。可能有人说,这个不是Callback模式吗?如下伪代码所示:

1
2
3
4
5
fn doFun(params, callback() {
doFun2(params, callback() {
...Callback Hell
})
})

如果将某一步的计算结果传递给下一个计算任务,这样依次传递下去,就会形成回调地狱(Callback Hell),代码极其不宜读,十分不优雅。接触过cpu流水线作业的同学知道流水线(pipeline)是一串操作的集合,其中前一个操作的输出是下一个操作的输入。流水线执行允许在同一时钟周期内重叠执行多个指令。异步编程有没有可能像流水线作业这样如此优雅呢?下面是试想出来流水线模型(伪代码):

1
2
3
4
5
doFun(() -> task1)
.doFun1(result -> task2(result))
.doFun2(result -> task3())
.doFun3(() -> task4())
.end(() -> doAnything())

上面计算任务的描述中用到了函数式编程语法模型,更易表达思想。流水线模型伪代码似乎有内味了😊。在JDK8中,引入了CompletionStage接口,这个接口就是为了描述异步计算时某一个完成阶段的状态和动作,下面是官方的介绍:

A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.

这个接口的方法多达几十个,功能相当丰富。同时提供了这个接口的实现CompletableFuture,这个是我们要讨论的主角。CompletionStage接口的方法参数大部分支持函数式,根据不同的函数式接口做不同的事情,下面简单介绍下函数式接口。

函数式接口

函数式编程在JDK1.8被引入,该编程范式极大精简代码,结合Java的类型推断功能,给Java注入了相当大的活力。JDK官方提供了很多函数式接口,用来描述函数的功能。下面是将常用的接口,比如Predicate做谓词运算,搭配test方法,功能也是直抒胸臆的。

函数式接口 函数描述符 功能
Predicate<T> (T) -> boolean boolean test( T) 接收一个参数,返回布尔值
Consumer<T> (T) -> void void accept(T) 接收一个参数,无返回值
Function<T, R> (T) -> R R apply(T) 接收一个参数,返回指定类型的数据
Supplier<T> ( ) -> T T get() 不接收参数,返回指定类型的数据
UnaryOperator<T> (T) -> T Function子接口:T apply(T) 接受一个参数为类型T,返回值类型也为T
BinaryOperator<T> (T, T) -> T BiFunction子接口:T apply(T t, T u) 接受两个输入参数的,返回一个结果
BiPredicate<L, R> (L, R) -> boolean boolean test(T t, U u) 接收两个参数,返回布尔值
BiConsumer<T, U> (T, U) -> void void accept(T t, U u) 接收两个参数,无返回值
BiFunction<T, U, R> (T, U) -> R R apply(T t, U u) 接受两个输入参数的,返回一个结果

CompletionStage接口

从上面的函数式接口来说,整体可以分析以下几大类:

  • 断言型,如Predicate
  • 消费并产出型,如Function,UnaryOperator
  • 消费不产出型,如Consumer
  • 产出型,Supplier

上面提到CompletionStage接口方法的参数大都是函数式接口,所以其方法在命名上也体现出其特点,比如用apply, accept 和 run 开头的方法分别表示Function, Consumer 和 Runnable。所以CompletionStage接口主要有以下类型:

  • 消费并产出型,用上一个阶段的结果作为当前方法的参数并计算出新结果,接口名称带有apply,参数为Function或其子接口;
  • 消费不产出型,用上一个阶段的结果作为当前方法的参数,但不对阶段结果产生影响,接口名称带有accept,参数为Consumer或其子接口;
  • 不消费也不产出型,不依据上一个阶段的执行结果,只要上一个阶段完成(但一般要求正常完成),就执行指定的操作,且不对阶段的结果产生影响,接口名称带有run,参数为Runnable

上述只是从函数式接口对CompletionStage接口方法进行简单分类,但CompletionStage提供的功能远不止此。我们知道异步编程需要有合并计算结果的能力,这个计算任务可以由一个阶段的完成触发,也可以由两个阶段的完成触发,也可以由两个阶段中的任何一个触发,所以按照计算依赖关系,可以分析以下几种:

  • 由一个阶段的完成触发,方法前缀为then
  • 由两个阶段的完成触发,方法带有combine或者both
  • 由两个阶段中任意一个完成触发,不能保证哪个的结果或效果用于相关阶段的计算,这类方法带有either

在看CompletionStage接口时,发现有很多方法是以Async结尾的,并且有些的函数参数里面有Executor,比如如下几个方法:

1
2
3
4
5
6
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn,
Executor executor);

这个是出于什么情况考虑的呢?由于异步编程底层是用线程池来做的,当上一阶段的计算完成,下一阶段可以继续使用这个线程计算,也可以从线程池里面重新拿一个线程(注意这个重新获取的线程可能与上一阶段一样),也可以给当前阶段的计算任务重新指定线程池,就从这个池子里面拿吧。这样设计也是为了异步计算性能考虑,如果上一阶段计算任务执行很慢的 I/O 操作,就会导致所有计算任务都阻塞在 I/O 操作上,进而影响计算性能。如果当前线程池已没有线程可用,那么当前的计算任务要被阻塞,所以可以根据不同的业务,指定不同的线程池,隔离彼此之间的影响。线程池的大小可由如下公式粗略计算。

image

如果在计算过程中,发生了异常,会出现什么情况?整个计算都终止吗?比如我上个计算任务失败了,但我还想继续执行下一个计算任务进行处理,而下一个计算任务最好能够感知到上一个计算任务发生了什么,以做出不同的处理。在平时处理异常时,基本用try...catch...finally,CompletionStage接口也为异步计算异常处理提供了相应支持。如下有三个方法

  • handle(BiFunction<? super T, Throwable, ? extends U> fn) 两个参数,一个代表上一阶段计算结果,一个代表上一阶段异常,支持返回值
  • whenComplete(BiConsumer<? super T, ? super Throwable> action) 两个参数,一个代表上一阶段计算结果,一个代表上一阶段异常,没有返回值,类似finally
  • exceptionally(Function fn) 类似于 try{}catch{} 中的 catch{},支持返回结果

CompletableFuture使用

上面从不同维度了解了CompletionStage接口的设计,从宏观角度简单理解异步编程遇到的问题以及解决方案。下面我们来看看JDK为CompletionStage接口提供的实现类CompletableFuture,该类是JDK8提供的。

直接执行异步计算

CompletableFuture提供最基本的功能就是直接执行一个任务,且没有返回值,使用方式如下:

1
CompletableFuture<Void> CompletableFuture.runAsync(Runnable runnable)

下面是一个例子,在runAsync方法里面可以写自己的异步逻辑,返回一个CompletableFuture,暗示我们可以进行执行异步任务

1
2
3
4
5
6
7
8
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});

如果想让异步任务有返回值,这个时候就用到了产出型函数接口Supplier,如下所示:

1
CompletableFuture<U> CompletableFuture.supplyAsync(Supplier<U> supplier)

下面是一个例子,执行一个异步任务,返回一个执行结果,我们可以通过get()方法直接拿到计算结果,当然也可以进行下一阶段的计算

1
2
3
4
5
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return Thread.currentThread().getName();
});

System.out.println(future.get());

处理上一阶段计算结果

当我们使用supplyAsync计算出结果,想传递给下一阶段的计算任务,该使用哪个方法呢?结合前面对CompletionStage接口的分析,从宏观角度来看,主要有消费不产出消费并产出这两种类型,所以CompletableFuture提供了thenApply / thenApplyAsync来处理消费并产出型,用thenAccept / thenAcceptAsync来处理消费不产出型。下面是使用方式:

1
2
CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture<Void> thenAccept(Consumer<? super T> action)

对于消费并产出型,我们可以在当前阶段获取上一阶段的计算结果,然后将当前阶段的计算结果传递给下一阶段,这样可以一直传递下去,如下代码所示:

1
2
3
4
5
6
7
8
9
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return Thread.currentThread().getName();
}).thenApplyAsync(item -> {
return item + "-111111-" + Thread.currentThread().getName();
}).thenApplyAsync(item -> {
return item + "-222222-" + Thread.currentThread().getName();
});

System.out.println(future.get());

对于消费不产出型,我们可以在当前阶段获取上一阶段的计算结果,内部处理完后不会再向下一阶段传递值,如下所示:

1
2
3
CompletableFuture.supplyAsync(() -> {
return Thread.currentThread().getName();
}).thenAccept(System.out::println);

整合两个计算结果

熟悉了上面几个方法,我们已经可以用链式编程来写代码了,但上面的方法参数都是函数式接口,如有我现在有两个方法,方法的返回值是CompletableFuture<T>,怎么讲两者的计算结果整合起来呢?CompletableFuture为我们提供了两个方法:thenCompose \ thenCombine,下面是这两个接口的使用方式:

1
2
3
CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)

thenCompose

考虑如下场景,我们有一个方法,根据用户id获取用户信息,还有一个方法,根据用户id拿到该用户的权限,最终将该权限信息填充到User实体,返回User。下面是这两个函数的简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
private CompletableFuture<User> getUser(String userId) {
return CompletableFuture.supplyAsync(() -> {
// DO ANYTHING
return new User();
});
}

private CompletableFuture<User> fillRes(User user) {
return CompletableFuture.supplyAsync(() -> {
// 获取权限信息,填充到用户信息里面
return user;
});
}

注意上述两者都返回CompletableFuture<User>,我们依稀记得thenApply属于消费并产出型。下面用thenApply将两者连接起来,如下:

1
CompletableFuture<CompletableFuture<User>> future = getUser(userId).thenApply(this::fillRes);

看起来有些不太妙,返回的结果出现了复合的CompletableFuture,这是因为thenApply的产出是一个CompletableFuture<User>,所以就出现了复合的情况。针对以上场景,JDK为我们提供了thenCompose,下面用thenCompose改写下:

1
2
3
// 利用 thenCompose() 组合两个独立的CompletableFuture
CompletableFuture<User> result = getUser(userId)
.thenCompose(user -> fillRes(user));

真是妙啊,真是我们想要的效果。由此可见,thenCompose的作用是组合两个独立的CompletableFuture。

thenCombine

上面提到还有一个thenCombine,也是组合计算结果的,下面是一个例子:

1
2
3
4
5
6
7
8
9
10
CompletableFuture<String> walker = CompletableFuture.supplyAsync(() -> {
return ", Walker";
});

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
}).thenApply(s -> s + " World")
.thenCombine(walker, (s1, s2) -> {
return s1 + s2;
});

首先我们异步拼接字符串 Hello World,然后我们想将另一个独立的异步任务计算结果与刚才的异步任务结果整合一下,这一点和thenCompose是不一样,thenCompose是拿到上一阶段的计算结果,thenCombine直接传入一个异步任务,第二个参数是BiFunction,函数描述符为(T, U) -> R,即将两个异步任务的结果整合。

整合多个任务结果

上述thenCompose \ thenCombine是针对两个异步任务情况而言的,试想下有没有这样的场景,并发地执行若干个任务,当所有任务完执行之后,整合其计算结果;执行若干个任务,只返回执行最快的,其他未执行完的,全部终止。这两种场景我们平时开发遇到的比较多,CompletableFuture也为我们提供了allOf / anyOf来实现上述功能,我们可以来试下,下面是两者的函数声明:

1
2
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf

官方对该方法的介绍如下:

Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.

从描述看出,当给定的所有CompletableFuture正常完成,当前阶段的计算任务就完成了。但看这个方法的返回值为CompletableFuture<Void>,居然什么都没有,那么该如何整合所有的计算结果呢?

现在我们有很多网页的URL,现在希望并发地将所有的URL对应页面全部通过爬虫下载下来,这个时候我们就可以用allOf来实现了,downloadWebPage是将给定的URL页面下载下来,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private CompletableFuture<String> downloadWebPage(String webPageLink) {
return CompletableFuture.supplyAsync(() -> {
Random random = new Random();
int i = random.nextInt(10) + 1;
System.out.println(webPageLink + " - " + i);
try {
Thread.sleep(i * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(webPageLink + " - done." );
return webPageLink + " - http://www.baidu.com";
});
}

由于不同的页面下载时间不一致,这里我们随机数来让线程进行等待。接着我们就要利用allOf来进行并发下载了,为了能更清晰地了解处理流程,加了些输出信息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
long startTime = System.currentTimeMillis();
List<String> webPageLinks = Arrays.asList("1", "2", "3", "4", "5");

// ①
List<CompletableFuture<String>> futures = webPageLinks.stream()
.map(this::downloadWebPage).collect(Collectors.toList());

System.out.println("下载中1");

// ②
// 注意这里返回泛型的是空
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

System.out.println("下载中2");

// ③
CompletableFuture<List<String>> allFuture = allOf.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});

System.out.println("下载中3");

// ④
List<String> strings = allFuture.join();

long endTime = System.currentTimeMillis();
System.out.println("总耗时长:" + (endTime - startTime) / 1000);

strings.forEach(System.out::println);

上面代码主要分四步:

  • ①:将URL列表转化为CompletableFuture列表,注意此时并没有执行下载操作;
  • ②:将CompletableFuture列表转为数组,然后交给CompletableFuture.allOf执行;
  • ③:由于第二步得到的是空的CompletableFuture,无法直接拿到这么多计算任务的结果,所以我们只能自己去拿,通过调用CompletableFuture的join方法,拿到所有的计算结果,注意这一步不会组塞,因为allOf会确保所有的计算任务完成;
  • ④:等待所有的任务执行完,并返回整合的结果。

为了印证上面的步骤无误,打印结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
下载中1
下载中2
下载中3
3 - 8
1 - 2
2 - 9
1 - done.
4 - 3
4 - done.
5 - 1
5 - done.
3 - done.
2 - done.
总耗时长:9
1 - http://www.baidu.com
2 - http://www.baidu.com
3 - http://www.baidu.com
4 - http://www.baidu.com
5 - http://www.baidu.com

从上面的打印结果来看,我们可以总结如下几点:

  1. 整个计算任务的耗时与任务列表中所耗最大时常有关,比如总耗时为9,其中第二个任务耗时为9,也是任务列表中耗时最大的;
  2. 整个计算任务为异步计算,当程序运行到第四步时,当前主线程会等待所有计算任务执行完成;
  3. allOf可以继续执行下一阶段计算任务,拿到allOf计算结果,而且不会阻塞这个地方。

anyOf

下面是anyOf的官方介绍:

Returns a new CompletableFuture that is completed when any of the given CompletableFutures complete, with the same result.

就是只要给定的计算任务只要有一个完成了,整个计算任务就会完成,并且返回那个完成任务的计算结果。下面还是用上面的例子,只不过这次改用anyOf,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long startTime = System.currentTimeMillis();
List<String> webPageLinks = Arrays.asList("1", "2", "3", "4", "5");

List<CompletableFuture<String>> futures = webPageLinks.stream()
.map(this::downloadWebPage).collect(Collectors.toList());

// 注意这里返回结果
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()]));

Object result = anyOf.join();

System.out.println(result);
long endTime = System.currentTimeMillis();
System.out.println("总耗时长:" + (endTime - startTime) / 1000);

由于有返回值,我们直接用join来获取计算结果,下面是打印结果:

1
2
3
4
5
6
7
1 - 1
2 - 10
3 - 8
1 - done.
1 - http://www.baidu.com
4 - 9
总耗时长:1

从打印结果来看,第一个计算任务耗时为1秒,然后直接返回任务1的执行结果,其他的任务全部取消。

异常处理

上面的这些方法都还没有提到异常,如果在执行计算任务出异常了该如何处理呢?直接在最外面catch住异常似乎不太好,分析CompletionStage接口的时候提到了三个方法,CompletableFuture同样也实现了这三个方法:

  • handle(BiFunction<? super T, Throwable, ? extends U> fn) 两个参数,一个代表上一阶段计算结果,一个代表上一阶段异常,支持返回值
  • whenComplete(BiConsumer<? super T, ? super Throwable> action) 两个参数,一个代表上一阶段计算结果,一个代表上一阶段异常,没有返回值,类似finally
  • exceptionally(Function fn) 类似于 try{}catch{} 中的 catch{},支持返回结果

exceptionally

如果上一阶段产生了异常,下一阶段想捕获这个异常,自己进行处理,而不至使整个计算任务失败,这个时候就可以使用exceptionally,该方法声明如下:

1
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

由于参数是Function,所以该方法既有消费又有产出,下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int age = -1;

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
if (age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if (age > 18) {
return "Adult";
} else {
return "Child";
}
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return "Unknown!";
}).thenAccept((r) -> {
System.out.println("result = " + r);
System.out.println("Done.");
});

future.join();

执行结果为:

1
2
3
java.lang.IllegalArgumentException: Age can not be negative
result = Unknown!
Done.

可见该方法的参数就是异常信息,这样我们就可以知道上一阶段的错误信息了,我们发现最后一步的thenAccept也执行了,说明异常被catch住。

whenComplete

当上一阶段完成时,有可能是正常完成,也有可能抛出了异常,我想在下一阶段不处理这个异常,仅仅是想对上一阶段运行的结果记录一下,可能在下下阶段用exceptionally再去处理,这个时候就可以用whenComplete了,该方法的声明如下:

1
CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)

注意参数是BiConsumer<? super T, ? super Throwable>,说明函数式表达式第一个参数为上一阶段的计算结果,第二个参数是上一阶段的异常信息,这两个参数不可能同时都有值。

例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int age = -1;

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
if (age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if (age > 18) {
return "Adult";
} else {
return "Child";
}
}).whenComplete((res, ex) -> {
if (ex != null) {
System.out.println("Oops! We have an exception - " + ex);
}

System.out.println(res);
}).thenRun(() -> {
System.out.println("Done.");
});

future.join();

上述代码在执行完whenComplete后,就会因为异常而退出,不会再继续进行执行了,如果想继续执行,可以继续使用exceptionally,如下:

1
2
3
4
5
6
7
8
9
10
11
// ...
.whenComplete((res, ex) -> {
if (ex == null) {
System.out.println(res);
} else {
throw new RuntimeException(ex);
}
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
});

handle

上面的whenComplete是不对结果进行处理,仅做记录。如果我们想直接处理上一阶段的结果或者异常,该怎么办呢?这时就用到了handle方法,该方法的声明如下:

1
CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

可以看到参数是BiFunction,说明既有消费,又有产出,函数式表达式第一个参数为上一阶段的计算结果,第二个参数是上一阶段的异常信息。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int age = -1;

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
if (age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if (age > 18) {
return "Adult";
} else {
return "Child";
}
}).handle((res, ex) -> {
if(ex != null) {
System.out.println("Oops! We have an exception - " + ex.getMessage());
return "Unknown!";
}
return res;
}).thenAccept((r) -> {
System.out.println(r);
System.out.println("Done.");
});

future.join();

运行结果为:

1
2
3
Oops! We have an exception - java.lang.IllegalArgumentException: Age can not be negative
Unknown!
Done.

References: