CompletableFuture异步多线程

发布时间 2023-08-10 21:37:11作者: TestRookie
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        //调用用户服务获取用户基本信息
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
            try {
                //模拟查询商品耗时500毫秒
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "用户A";
        });

        //调用商品服务获取商品基本信息
        CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
                //模拟查询商品耗时500毫秒
        {
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });

        System.out.println("获取用户信息:" + userFuture.get());
        System.out.println("获取商品信息:" + goodsFuture.get());

        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }

运行结果

 

CompletableFuture创建方式

1、常用的4种创建方式

CompletableFuture源码中有四个静态方法用来执行异步任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}  
public static CompletableFuture<Void> runAsync(Runnable runnable){..}  
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}  
  • 「supplyAsync」执行任务,支持返回值。

  • 「runAsync」执行任务,没有返回值。

「supplyAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)  
//自定义线程,根据supplier构建执行任务  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 

「runAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable)   
//自定义线程,根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)  

2、结果获取的4种方式

对于结果的获取CompltableFuture类提供了四种方式

//方式一  
public T get()  
//方式二  
public T get(long timeout, TimeUnit unit)  
//方式三  
public T getNow(T valueIfAbsent)  
//方式四  
public T join()  

说明:

  • 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常

  • 「getNow」 => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值

  • 「join」 => 方法里不会抛出异常

public void testCompletableGet() throws InterruptedException, ExecutionException {  

    CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        return "商品A";  
    });  

    // getNow方法测试,如果未计算完,则返回设定的默认值valueIfAbsent
    System.out.println(cp1.getNow("商品B"));  

    //join方法测试   
    CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));  //不会抛出异常
    System.out.println(cp2.join());  //抛出异常
   System.out.println("-----------------------------------------------------");  
    //get方法测试  
    CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));  
    System.out.println(cp3.get());  
}  

「运行结果」

  • 第一个执行结果为 「商品B」,因为要先睡上1秒结果不能立即获取

  • join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException

  • get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException

 

 

异步回调方法

 示例

public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {  
    long startTime = System.currentTimeMillis();  
      
    CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {  
        try {  
            //执行任务A  
            Thread.sleep(600);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  

    });  

    CompletableFuture<Void> cp2 =  cp1.thenRun(() -> {  
        try {  
            //执行任务B  
            Thread.sleep(400);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    });  

    // get方法测试  
    System.out.println(cp2.get());  

    //模拟主程序耗时时间  
    Thread.sleep(600);  
    System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");  
}  
  
//运行结果    
/**  
 *  null  
 *  总共用时1610ms  
 */  

「thenRun 和thenRunAsync有什么区别呢?」

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

说明: 后面介绍的thenAcceptthenAcceptAsyncthenApplythenApplyAsync等,它们之间的区别也是这个。

2、thenAccept/thenAcceptAsync

 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
public void testCompletableThenAccept() throws ExecutionException, InterruptedException {  
    long startTime = System.currentTimeMillis();  
    CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {  
        return "dev";  

    });  
    CompletableFuture<Void> cp2 =  cp1.thenAccept((a) -> {  
        System.out.println("上一个任务的返回结果为: " + a);  //dev
    });  
   
    cp2.get();  //返回null
}  

3、 thenApply/thenApplyAsync

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

示例

public void testCompletableThenApply() throws ExecutionException, InterruptedException {  
    CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {  
        return "dev";  

    }).thenApply((a) -> {  
        if(Objects.equals(a,"dev")){  
            return "dev";  
        }  
        return "prod";  
    });  

    System.out.println("当前环境为:" + cp1.get());  

    //输出: 当前环境为:dev  
}  

异常回调

CompletableFuture的任务不论是正常完成还是出现异常它都会调用 「whenComplete」这回调函数。

  • 「正常完成」:whenComplete返回结果和上级任务一致,异常为null;

  • 「出现异常」:whenComplete返回结果为null,异常为上级任务的异常;

即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。

1、只用whenComplete

public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {  
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {  

        if (Math.random() < 0.5) {  
            throw new RuntimeException("出错了");  
        }  
        System.out.println("正常结束");  
        return 0.11;  

    }).whenComplete((aDouble, throwable) -> {  
        if (aDouble == null) {  
            System.out.println("whenComplete aDouble is null");  
        } else {  
            System.out.println("whenComplete aDouble is " + aDouble);  
        }  
        if (throwable == null) {  
            System.out.println("whenComplete throwable is null");  
        } else {  
            System.out.println("whenComplete throwable is " + throwable.getMessage());  
        }  
    });  
    System.out.println("最终返回的结果 = " + future.get());  
}  

正常完成,没有异常时:

正常结束  
whenComplete aDouble is 0.11  
whenComplete throwable is null  
最终返回的结果 = 0.11  

 

出现异常时:get()会抛出异常 

whenComplete aDouble is null  
whenComplete throwable is java.lang.RuntimeException: 出错了  
  
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了  
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)  
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)  

 

2、whenComplete + exceptionally示例

public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {  
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {  
        if (Math.random() < 0.5) {  
            throw new RuntimeException("出错了");  
        }  
        System.out.println("正常结束");  
        return 0.11;  

    }).whenComplete((aDouble, throwable) -> {  
        if (aDouble == null) {  
            System.out.println("whenComplete aDouble is null");  
        } else {  
            System.out.println("whenComplete aDouble is " + aDouble);  
        }  
        if (throwable == null) {  
            System.out.println("whenComplete throwable is null");  
        } else {  
            System.out.println("whenComplete throwable is " + throwable.getMessage());  
        }  
    }).exceptionally((throwable) -> {  
        System.out.println("exceptionally中异常:" + throwable.getMessage());  
        return 0.0;  
    });  

    System.out.println("最终返回的结果 = " + future.get());  
}  

当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。

 
whenComplete aDouble is null  
whenComplete throwable is java.lang.RuntimeException: 出错了  
exceptionally中异常:java.lang.RuntimeException: 出错了  
最终返回的结果 = 0.0