1、CompletableFuture

发布时间 2023-06-26 15:35:34作者: 奶油炒白菜

CompletableFuture

Future为什么会出现

Future接口可以为主线程开一个分支任务,专门为主线程处理耗时耗力的复杂任务
Future提供一个异步并行计算的功能
异步多线程执行且有返回结果
三个特点
    多线程/有返回/异步任务

Future常用实现类FutureTask

使用缺点

​ get方法获取返回值,容易阻塞
​ isDone方法轮询获取返回值,消耗cpu资源

CompleteFuture 出现

对于真正的异步处理,我们希望的是通过传入回调函数,在Future结束时自动调用该回调函数,这样我们就不用等待结果
CompleteFuture 提供一种类似观察者模式机制,可以让任务完成后通知监听一方

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

CompletableFuture 提供了Future 的扩展功能,可以简化异步编程的复杂性,并且提供了函数式编程能力,可以通过回调处理计算结果,也提供了转换和组合的 CompletableFuture 方法 他可能代表一个明确的future 也有可能代表一个挖成阶段,支持在完成后触发一些函数或者执行某些动作

CompletionStage 分阶段任务,代表异步过程中的某个阶段一个阶段完成后可能触发另一个阶段,类似Linux的管道符 一个阶段的执行可能是一个 Function Consumer Runnable 一个阶段的执行可能是一个阶段触发的,也可能时多个阶段一起触发

核心的四个方法获得一个异步任务

不适用 new CompletableFuture() --》创建一个不完整的CompletableFuture(不推荐使用)

    // 核心方法
    // 有返回值-默认线程池
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    
    // 有返回值-自定义线程池
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    
    // 无返回值-默认线程池
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    // 无返回值-自定义线程池
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

无返回值

    // 创建  CompletableFuture 对象 无返回值--默认线程池
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
        System.out.println(voidCompletableFuture.get());
    result
        ForkJoinPool.commonPool-worker-9
        null    
    
    
    // 创建  CompletableFuture 对象 

无返回值--自定义线程池

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }, executorService);
    System.out.println(voidCompletableFuture.get());
}
result
    pool-1-thread-1
    null

有返回值

//  创建  CompletableFuture 对象 有返回值--默认线程池
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return Thread.currentThread().getName();
});
System.out.println(stringCompletableFuture.get());
result
    ForkJoinPool.commonPool-worker-9
    ForkJoinPool.commonPool-worker-9
    
//  创建  CompletableFuture 对象 有返回值--默认线程池
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return Thread.currentThread().getName();
}, Executors.newFixedThreadPool(3));
System.out.println(stringCompletableFuture.get());
result
    pool-1-thread-1

    pool-1-thread-1 

Executor executor 参数说明
没有指定使用默认 ForkJoinPool.commonPool()
自定义则使用自定义的线程池

实战

1、无异常将结果传给完成的方法

CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    int i = ThreadLocalRandom.current().nextInt(10);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return i;
}).whenComplete((r, e) -> {
    if (e== null) {
        System.out.println("处理完成--------------无异常");
    }
}).exceptionally(e ->  {
    e.printStackTrace();
    System.out.println("出现异常了------------------------");
    return null;
});
System.out.println("主线程工作----------------------------");、

结果
    ForkJoinPool.commonPool-worker-9
    主线程工作----------------------------

问题:whenComplete 没有执行
原因:
主线程执行结束了,异步线程也自动结束

解决

1、// 主线程的等待 三秒

     // 主线程的等待 三秒
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    
    结果:
         ForkJoinPool.commonPool-worker-9
        主线程工作----------------------------
        处理完成--------------无异常

2、使用自定义线程池

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName());
        int i = ThreadLocalRandom.current().nextInt(10);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return i;
    }, executorService).whenComplete((r, e) -> {
        if (e== null) {
            System.out.println("处理完成--------------无异常");
        }
    }).exceptionally(e ->  {
        e.printStackTrace();
        System.out.println("出现异常了------------------------");
        return null;
    });
    System.out.println("主线程工作----------------------------");
    executorService.shutdown();
    
    结果
        pool-1-thread-1
        主线程工作----------------------------
        处理完成--------------无异常

出现异常捕获相应异常

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            int i = ThreadLocalRandom.current().nextInt(10);
            int j = 10/0;
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return i;
        }, executorService).whenComplete((r, e) -> {
            if (e== null) {
                System.out.println("处理完成--------------无异常");
            }
        }).exceptionally(e ->  {
            e.printStackTrace();
            System.out.println("出现异常了------------------------");
            return null;
        });
        System.out.println("主线程工作----------------------------");
        executorService.shutdown();
    }
    
    结果
        pool-1-thread-1
    主线程工作----------------------------
    java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.ArithmeticException: / by zero
        at CompletableFutureUse.lambda$main$0(CompletableFutureUse.java:40)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
        ... 3 more
    出现异常了------------------------

优点
异步结束时会自动调用某个对象的方法
主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
异步任务出错时,会自动回调某个方法

电商网站需求

函数式编程,lambda表达式 流式计算
需求
同时搜索出同一款产品在不同平台的售价
同时搜索出同款产品在同一个平台的售价
返回
List<String>
解决
一条线程查

public static List<String> getPrise(List<NetMall> netMalls, String productName) {
    return netMalls.
            stream()
            .map(netMall ->
                    String.format(productName + "in %s prise is %.2f" , netMall.getName()
                            , netMall.calcPrise(productName)))
            .collect(Collectors.toList());
}

多线程并行查

public static List<String> getPriseCompletableFuture(List<NetMall> netMalls, String productName) {
    return netMalls.
            stream()
            .map(netMall ->
                    CompletableFuture.supplyAsync(() ->
                            String.format(productName + "in %s prise is %.2f"                                    , netMall.getName()
                                    , netMall.calcPrise(productName)))).collect(Collectors.toList())
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());

}

CompletableFuture常见接口

获得结果的触发计算

// 1、Waits if necessary for this future to complete, and then returns its result.
public T get() throws InterruptedException, ExecutionException {}

// 2、Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.
//Params:
timeout – the maximum time to wait unit – the time unit of the timeout argument
// timeout 最大等待时间,timeout结束后自动返回无需等待
public T get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {}
    
// 3、与get功能一样,区别在于编译时是否报出检查时异常
// get 编写时必须要抛出异常
// join 编写时不用显式抛出异常
public T join() {}

// 4、1、计算完成返回计算结果 2、没有计算完成返回 valueIfAbsent
public T getNow(T valueIfAbsent) {}

对计算结果进行处理

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {}
// 1、计算结果存在依赖关系,两个线程串行化。
// 由于存在穿行关系,当前处理出现异常,抛出异常,后续操作不执行
public <U> CompletableFuture<U> thenApply(}
// 正常运行
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> future1 = CompletableFuture
        .supplyAsync(() -> 1, executorService)
        .thenApply(f -> {
            System.out.println("____________step1");
            return f + 1;
        }).thenApply(f -> {
            System.out.println("____________step2");
            return f + 1;
        }).exceptionally(e -> {
            System.out.println("_____________exception");
            return 5;
        });
System.out.println(future1.join());
结果:
    ____________step1
    ____________step2
    3
// --------------------------------------

// 出现异常运行
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> future1 = CompletableFuture
        .supplyAsync(() -> 1, executorService)
        .thenApply(f -> {
            int i = 10/0;
            System.out.println("____________step1");
            return f + 1;
        }).thenApply(f -> {
            System.out.println("____________step2");
            return f + 1;
        }).exceptionally(e -> {
            System.out.println("_____________exception");
            return 5;
        });
System.out.println(future1.join());
结果:
    _____________exception
    5    
    
                                                                                                                                                   

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {}
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> future1 = CompletableFuture
        .supplyAsync(() -> 1, executorService)
        .handle((item, exp) -> {
            int i = 10 / 0;
            System.out.println("____________step1");
            return item + 1;
        }).handle((item, exp) -> {
            System.out.println("____________step2");
            return item + 1;
        }).handle((item, exp) -> {
            System.out.println("____________step3");
            return item + 1;
        }).thenApply(f -> {
            System.out.println("____________step4");
            return f + 1;
        }).exceptionally(e -> {
            System.out.println("_____________exception");
            return 5;
        });
System.out.println(future1.join());
结果:
    ____________step2
    ____________step3
    _____________exception
    5

对计算结果进行消费

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 接受任务的处理结果,无返回结果
// 执行顺序说明:  
// 1、public CompletableFuture<Void> thenRun(Runnable action): 任务a执行完成执行任务b,并且b不需要a的结果,无返回值
// 2、public CompletableFuture<Void> thenAccept(Consumer<? super T> action):  任务a执行完成执行任务b,并且b需要a的结果,无返回值
// 3、public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) : 任务a执行完成执行任务b,并且b需要a的结果,有返回值

System.out.println("thenRun:" + CompletableFuture.supplyAsync(() -> 1).thenRun(System.out::println).join());
System.out.println("------------------");
System.out.println("thenAccept:" + CompletableFuture.supplyAsync(() -> 1).thenAccept(System.out::println).join());
System.out.println("------------------");
System.out.println("thenApply:" + CompletableFuture.supplyAsync(() -> 1).thenApply(i -> i + 1).join());
结果;
    thenRun:null
    ------------------
    1
    thenAccept:null
    ------------------
    thenApply:2

线程池运行选择

​ 1、没有传入自定义线程,使用默认线程池

​ 2、传入自定义线程池
​ 调用 thenRun 方法执行第二个任务 时,则第二个任务和第一个任务公用一个线程池
​ 调用 thenRunAsync 执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个使用默认线程池
​ 3、有可能处理太快系统优化切换,直接使用main 主线程

对计算速度进行选用

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) 
// 谁快用谁的结果
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "a";
});
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "b";
});

System.out.println(a.applyToEither(b, result -> result).join());

结果:
    a

对计算结果进行合并

​ 两个 CompletionStage 任务都完成后最终能把两个任务的计算结果一起交给thenCombine进行处理先完成的任务先等着,等待其他分支任务

CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return 10;
});
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return 20;
});

/*  第一个参数 x 是 调用对象本身的返回值  第二个参数 y 是 调用被对象本身的返回值 */System.out.println(a.thenCombine(b, (x, y) -> {
    System.out.println(x);
    System.out.println(y);
    return x + y;
}).join());

结果:
    10
    20
    30