CompletableFuture异步编程

发布时间 2023-12-28 17:51:01作者: selicoco

一、基本介绍

1.1 多线程编程的发展过程

  1. 创建线程的方式
  • 继承 Thread 类
  • 实现 Runnable 接口

特点:没有参数,没有返回值,没办法抛出异常

  1. JDK 1.5 进阶版Callable + Future

Callable接口中定义的 V call() throws Exception,该方法可以返回泛型值 V,并能够抛出异常。 Callable 只能在 ExecutorService 中使用, 直接通过executorService.submit(new CallAbleTask(i)),返回的结果是Future,结果信息从Future里面取出,具体的业务逻辑在call中执行。

特点:有返回值在Future中,能抛出异常,只能通过阻塞或者轮询的方式获取结果,不支持设置回调方法

  1. JDK1.8 高级版CompletableFuture

CompletableFuture是实现异步化的工具类,上手难度较低,且功能强大。

在之前我们一般通过Future实现异步。若要设置回调一般会使用guava的ListenableFuture,回调的引入又会导致回调地狱。

 

 

CompletableFuture实现了两个接口:Future、CompletionStage。Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenApply、thenCompose等函数式编程方法来组合编排这些步骤。

因此completableFuture能实现整个异步调用接口的扁平化和流式处理,支持通过函数式编程的方式对各类操作进行组合编排,解决原有Future处理一系列链式异步请求时的复杂编码。

特点:不需要手工分配线程,JDK 自动分配;代码语义清晰,异步任务链式调用;支持编排异步任务

二、CompletableFuture常用方法

 

CompletableFuture提供了四个静态方法来创建一个异步操作:

 

 

这四个方法的区别:

1)runAsync() 以Runnable函数式接口类型为参数,没有返回结果,supplyAsync() 以Supplier函数式接口类型为参数,返回结果类型为U;Supplier接口的 get()是有返回值的(会阻塞);

2)使用没有指定Executor的方法时,内部使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

3)默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,这里建议强制传线程池,且根据实际不同的业务类型做线程池隔离。

 

常用的组合依赖关系

  1. 依赖关系

thenApply():把前面任务的执行结果,交给后面的Function

thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

  1. and集合关系

thenCombine():合并任务,有返回值

thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值

runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)

  1. or聚合关系

applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值

acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值

runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)

  1. 并行执行

allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture

anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture

  1. 结果处理

whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作

exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成

 

 

API过多,整体可以进行如下分类:

  • 带run的方法,无入参,无返回值;
  • 带supply的方法,无入参,有返回值;
  • 带accept的方法,有入参,无返回值;
  • 带apply的方法,有入参,有返回值;
  • 带handle的方法,有入参,有返回值,并且带异常处理;
  • 以Async结尾的方法,都是异步的,否则是同步的; ​方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
  • 以Either结尾的方法,只需完成任意一个;
  • 以Both/Combine结尾的方法,必须所有都完成。 ​
  • join 阻塞等待,不会抛异常;
  • get 阻塞等待,会抛异常;
  • complete(T value) 不阻塞,如果任务已完成,返回处理结果。如果没完成,则返回传参value;
  • completeExceptionally(Throwable ex) 不阻塞,如果任务已完成,返回处理结果。如果没完成,抛异常。

 

三、CompletableFuture的使用场景

1、异步组合任务一 + 任务二都完成,再执行任务三,并返回任务三的结果

 1 ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
 2 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
 3     System.out.println("任务一开始");
 4     return 1;
 5 },executorService);
 6 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
 7     System.out.println("任务二开始");
 8     return 2;
 9 });
10 //thenCombine明确任务3要等待任务1和任务2都完成后才能开始;
11 CompletableFuture<Integer> future3 = future1.thenCombine(future2,(res1,res2)->{
12     System.out.println("感知任务完成,合并两个任务,返回一个新的结果");
13     return res1+","+res2; 
14 },executorService);

 

2、异步组合任务一 、任务二、任务三全部完成,再执行任务四

这种多元依赖可以通过allOfanyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf,如下代码:

ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务三开始");
    return 3;
});
CompletableFuture<Void> future4 = CompletableFuture.allOf(future1, future2, future3);
CompletableFuture<String> result = future4.thenApply(v -> {
  //这里的join并不会阻塞,因为传给thenApply的函数是在future1、future2、future3全部完成时,才会执行 。
  result1 = future1.join();
  result2 = future2.join();
  result3 = future3.join();
  //根据result1、result2、result3组装最终result;
  return "result";
});

 

四、案例分析

以下代码哪些地方使用不当?哪些地方可以使用其他方法来实现?

 1 public void syncBiddingByProjects(List<Long> projectIds) {
 2         if (CollectionUtils.isEmpty(projectIds)) {
 3             return;
 4         }
 5         //多线程并发处理
 6         BiddingServiceImpl biddingService = (BiddingServiceImpl) AopContext.currentProxy();
 7         AtomicInteger finishTask = new AtomicInteger(0);
 8         CompletableFuture[] futures = projectIds.stream().map(projectId -> CompletableFuture
 9                 .supplyAsync(() -> syncBiddingByProjectId(biddingService, projectId), biddingSyncPool)
10                 .whenComplete((result, exc) -> {
11                     if (null != result && result) {
12                         finishTask.incrementAndGet();
13                     }
14                 })).toArray(CompletableFuture[]::new);
15 
16         //开始等待所有任务执行完成
17         CompletableFuture<Void> headerFuture = CompletableFuture.allOf(futures);
18         try {
19             headerFuture.join();
20         } catch (Exception ex) {
21             log.error("syncBiddingByProjects fail, allCnt={}, sucCnt={}", projectIds.size(), finishTask.get(), ex);
22             throw BusinessException.create("拉取数据的任务执行失败,allCnt=" +  projectIds.size() +", sucCnt=" + finishTask.get());
23         }
24     }