CompletableFuture

发布时间 2023-12-12 18:19:22作者: lihewei

常用API

创建异步任务:

  • supplyAsync() 创建带返回值的的异步任务
  • runAsync() 创建不带返回值的异步任务

获取任务结果:

  • get() 如果完成则返回结果,否则就抛出具体的异常
  • get(long timeout, TimeUnit unit) 最大时间等待返回结果,否则就抛出具体异常
  • join() 完成时返回结果值,否则抛出 unchecked 异常
  • getNow(T valueIfAbsent) 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent
  • completeExceptionally(Throwable ex) 如果任务没有完成,就抛出给定异常

异步任务回调方法:

  • thenApply() 、thenApplyAsync() 异步任务执行完毕后执行的回调方法(有入参、有返回值)
  • thenAccep()、thenAcceptAsync() 异步任务执行完毕后执行的回调方法(有入参、无返回值)
  • thenRun()、thenRunAsync() 异步任务执行完毕后执行的回调方法(无入参、无返回值)
  • whenComplete()、whenCompleteAsync() 异步任务回调方法,异常可传递(有入参、无返回值)
  • handle()、handleAsync() 异步任务回调方法,异常可传递(有入参、有返回值)

多任务组合处理:

  • thenCombine()、thenAcceptBoth()、runAfterBoth() A、B执行完,才能执行C
  • applyToEither()、acceptEither()、runAfterEither() A、B谁先执行完,谁去执行C
  • allOf()、anyOf:等待配置的N个异步任务执行完毕后执行某个任务

创建异步任务

  • supplyAsync()
  • runAsync()
// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  
// 不带返回值的异步请求,默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)

// 不带返回值的异步请求,可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

supplyAsync() 代码演示:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            return "result";
        });

        //等待任务执行完成
        System.out.println("结果->" + cf.get());
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 自定义线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            return "result";
        }, executorService);

        //等待子任务执行完成
        System.out.println("结果->" + cf.get());
}

runAsync() 代码演示:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
            System.out.println("do something....");
        });

        //等待任务执行完成
        System.out.println("结果->" + cf.get()); //结果 null
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 自定义线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
            System.out.println("do something....");
        }, executorService);

        //等待任务执行完成
        System.out.println("结果->" + cf.get()); //结果 null
}

获取任务结果

// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException 

// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()

// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)

// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)

// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex) 

异步回调处理【回调方法】

有入参,有返回值

  • thenApply()

  • thenAcceptAsync()

    ​某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。使用thenApply方法时子任务与父任务使用的是同一个线程,thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            result += 2;
            return result;
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.thenApply((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            result += 2;
            return result;
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

测试结果:

Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-25,5,main] cf2 do something....
cf1结果->1
cf2结果->3

有入参,无返回值

  • thenAccep()

  • thenAcceptAsync()

    某个任务执行完成后执行的动作(即回调方法),将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });

        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get()); //null
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenAcceptAsync((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });

        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get()); //null
}

无入参,无返回值

  • thenRun()
  • thenRunAsync()

​ thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });

        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });

        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

异常可传递【有参、无返回】

  • whenComplete() 回调方法正常执行,返回异步任务执行结果(cf1的结果)
  • whenCompleteAsync()

​ whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture(cf1)的 result 和该任务一致。如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            // int a = 1/0;
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
        });

        //等待任务2执行完成,返回cf1的执行结果
        System.out.println("cf2结果->" + cf2.get());
}

测试结果

Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[main,5,main] cf2 do something....
上个任务结果:1
上个任务抛出异常:null
cf2结果->1

异常可传递【有参、有返回】

  • handle()
  • handleAsync()

务执行完成后执行的回调方法,正常执行则异常为null,get方法返回执行结果,如果是执行异常,则get方法抛出异常。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            // int a = 1/0;
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
            return result+2;
        });

        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

测试结果

Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-25,5,main] cf2 do something....
上个任务结果:1
上个任务抛出异常:null
cf2结果->3

多任务组合处理

A、B任务均执行完才能执行C

这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

  • thenCombine() 会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值
  • thenAcceptBoth() 将两个任务的执行结果作为方法入参,但是无返回值
  • runAfterBoth() 没有入参,也没有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            return 2;
        });

        CompletableFuture<Integer> cf3 = cf1.thenCombine(cf2, (a, b) -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
            return a + b;
        });

        System.out.println("cf3结果->" + cf3.get());//3
}

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            return 2;
        });
        
        CompletableFuture<Void> cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
            System.out.println(a + b);
        });

        System.out.println("cf3结果->" + cf3.get());
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            return 2;
        });

        CompletableFuture<Void> cf3 = cf1.runAfterBoth(cf2, () -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
        });

        System.out.println("cf3结果->" + cf3.get());
}

测试结果

Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
Thread[main,5,main] cf3 do something....
cf3结果->3


Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
Thread[main,5,main] cf3 do something....
3
cf3结果->null


Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
Thread[main,5,main] cf3 do something....
cf3结果->null

A、B任意一个执行完立马执行C

这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。

  • applyToEither() 任务的执行结果作为所提供函数的参数,且该方法有返回值
  • acceptEither() 任务的执行结果作为方法入参,但是无返回值
  • runAfterEither() 没有入参,也没有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf1 任务完成";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf2 任务完成";
        });

        CompletableFuture<String> cf3 = cf1.applyToEither(cf2, (result) -> {
            System.out.println("接收到" + result);
            System.out.println(Thread.currentThread() + " cf3 do something....");
            return "cf3 任务完成";
        });

        System.out.println("cf3结果->" + cf3.get());
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf1 任务完成";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf2 任务完成";
        });

        CompletableFuture<Void> cf3 = cf1.acceptEither(cf2, (result) -> {
            System.out.println("接收到" + result);
            System.out.println(Thread.currentThread() + " cf3 do something....");
        });

        System.out.println("cf3结果->" + cf3.get());
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });

        CompletableFuture<Void> cf3 = cf1.runAfterEither(cf2, () -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
            System.out.println("cf3 任务完成");
        });

        System.out.println("cf3结果->" + cf3.get());
}

测试结果

Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
接收到cf1 任务完成
Thread[ForkJoinPool.commonPool-worker-25,5,main] cf3 do something....
cf3结果->cf3 任务完成


Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
cf1 任务完成
接收到cf1 任务完成
Thread[ForkJoinPool.commonPool-worker-25,5,main] cf3 do something....
cf3结果->cf3 任务完成


Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
cf1 任务完成
Thread[ForkJoinPool.commonPool-worker-25,5,main] cf3 do something....
cf3 任务完成
cf3结果->null

N个任务执行完毕执行某个任务

  • allOf():等待配置的N个异步任务执行完毕后执行某个任务,无异常的情况下,返回值=null。
  • anyOf:等待配置的N个异步任务执行完毕后执行某个任务,无异常的情况下,返回值是最先执行完毕的线程任务。
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                //int a = 1/0;
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });

        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf3 任务完成");
            return "cf3 任务完成";
        });

        CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2, cf3);
        System.out.println("cfAll结果->" + cfAll.get());
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });

        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf3 任务完成");
            return "cf3 任务完成";
        });

        CompletableFuture<Object> cfAll = CompletableFuture.anyOf(cf1, cf2, cf3);
        System.out.println("cfAll结果->" + cfAll.get());
}

测试结果

Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
Thread[ForkJoinPool.commonPool-worker-11,5,main] cf2 do something....
cf1 任务完成
cf3 任务完成
cf2 任务完成
cfAll结果->null
	
	
Thread[ForkJoinPool.commonPool-worker-25,5,main] cf1 do something....
Thread[ForkJoinPool.commonPool-worker-18,5,main] cf2 do something....
Thread[ForkJoinPool.commonPool-worker-11,5,main] cf2 do something....
cf1 任务完成
cfAll结果->cf1 任务完成