CompletableFuture 异步编排详解

CompletableFuture 异步编排详解

详情请查看:JulyWhj博客
业务场景:

查询商品详情页面逻辑比较复杂,有些数据需要远程调用,必然需要花费更多的时间。

假如商品详情每个页面查询,需要的如下的标准时间完成,那么用户需要10s才能完成。这里我们需采用异步查询,但是比如接口A查询商品信息,而接口B需要查询商品的SKU,接口C需要查询商品供应商等信息,如接口C必须依赖接口A或接口B的返回值。那么我们就需要使用CompletableFuture接口来实现。

一、开启异步编程

runAsync:无入参、无返回值

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) {System.out.println("main start ...");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println("开启异步任务...");}, service);System.out.println("main end ...");}
}

执行结果:

main start ...main end ...开启异步任务...

supplyAsync :无入参,可以获取返回值

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);@SneakyThrowspublic static void main(String[] args) {System.out.println("main start ...");CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");return "开启异步任务,我是返回值";}, service);System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务...
获取异步任务返回值:开启异步任务,我是返回值
main end ...

二、计算完成回调

当我们想第一个异步任务执行完成后,还需要做其他的事情。我们的CompletableFuture提供了计算完成时回调方法,whenCompletewhenCompleteAsyncexceptionally等接口。

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

whenComplete 可以处理正常和异常的计算结果,exceptionally: 处理异常情况。

whenCompletewhenCompleteAsync 的区别是whenComplete 是执行当前任务的线程继续执行whenComplete的任务。

whenCompleteAsync: 是把whenCompleteAsync的任务继续提交给线程池来进行执行。

whenCompleteAsync

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);@SneakyThrowspublic static void main(String[] args) {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).whenCompleteAsync((res, exc) -> {System.out.println("异步任务完成了,执行结果是:" + res + "  异常是:" + exc);});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务...
异步任务完成了,执行结果是:5  异常是:null
获取异步任务返回值:5
main end ...

如果异步任务出现了异常,可以通过exc打印异常,我们在程序中设置一个运行时异常 ,如下:

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;if (i == 5) {throw new RuntimeException("远程服务调用失败");}return i;}, service).whenCompleteAsync((res, exc) -> {System.out.println("异步任务完成了,执行结果是:" + res + "  异常是:" + exc);});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务...
异步任务完成了,执行结果是:null  异常是:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
main end ...

exceptionally

上面异步任务出现了异常,我们可以使用exceptionally进行异常处理。exceptionally 接口可以接收一个异常,返回异常处理结果。

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;if (i == 5) {throw new RuntimeException("远程服务调用失败");}return i;}, service).whenCompleteAsync((res, exc) -> {System.out.println("异步任务完成了,执行结果是:" + res + "  异常是:" + exc);}).exceptionally(throwable -> {System.out.println("进入了异常处理,捕获了" + throwable.getMessage() + "异常");return 5;});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务...
异步任务完成了,执行结果是:null  异常是:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
进入了异常处理,捕获了java.lang.RuntimeException: 远程服务调用失败异常
获取异步任务返回值:5
main end ...

我们可以看到,通过exceptionally可以捕获异步任务抛出来的异常信息,并对异常进行处理,并可以将处理结果返回。

whenComplete虽然可以得到异常信息,但是无法修改结果,exceptionally可以感知异常,同时可以返回默认值。

三、handle最终处理

handle和whenComplete方法类似,但是whenComplete能感知异常但是不能返回结果。只能通过exceptionally进行处理。

而handle即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;if (i == 5) {throw new RuntimeException("远程服务调用失败");}return i;}, service).handleAsync((res, thr) -> {System.out.println("进入handleAsync方法");if (res != null) {return res * 2;}if (thr != null) {System.out.println("捕获到异常" + thr);return 0;}return 0;}, service);System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务...
进入handleAsync方法
捕获到异常java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
获取异步任务返回值:0
main end ...

如果我们去掉异常信息,可以看到如下返回值,最终异步执行结果为10;

main start ...
开启异步任务...
进入handleAsync方法
获取异步任务返回值:10
main end ...

四、线程串行化

在CompletableFuture中有以下方法:

public  CompletableFuture thenApply(Function fn)
public  CompletableFuture thenApplyAsync(Function fn)
public  CompletableFuture thenApplyAsync(Function fn, Executor executor)public CompletableFuture thenAccept(Consumer action)
public CompletableFuture thenAcceptAsync(Consumer action)
public CompletableFuture thenAcceptAsync(Consumer action,Executor executor)public CompletableFuture thenRun(Runnable action)
public CompletableFuture thenRunAsync(Runnable action)
public CompletableFuture thenRunAsync(Runnable action,Executor executor)
  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值
  • thenAccept方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果
  • thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。
    • thenRun 获取不到上个任务的执行结果,无返回值。

thenRun

thenRun 不能获取上一步的执行结果,并无返回值。

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).thenRun(() -> {System.out.println("任务2启动了...");});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}

运行结果:

main start ...
开启异步任务...
任务2启动了...
获取异步任务返回值:null
main end ...

如果我们需要获取上一步的执行结果,我们使用thenAccept;

thenAccept

消费处理结果,接收任务的处理结果,并消费处理,无返回结果

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).thenAcceptAsync((res) -> {System.out.println("任务2启动了... 上一步的结果是:" + res);}, service);System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务...
任务2启动了... 上一步的结果是:5
获取异步任务返回值:null
main end ...

如果我们即需要上一步执行结果,并需要返回值供别人使用,那么我们使用thenApply方法

thenApply

当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).thenApplyAsync((res) -> {System.out.println("任务2启动了... 上一步的结果是:" + res);return res * 2;}, service);System.out.println("获取异步任务最终返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务...
任务2启动了... 上一步的结果是:5
获取异步任务最终返回值:10
main end ...

五、两任务组合-两个任务都完才

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);         public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) }
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)}

两个任务必须都完成,触发该任务。

  • runAfterBoth 没有返回值,入参CompletionStage、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务)
  • thenAcceptBoth 可以获取两个任务的返回值。
  • thenCombine 可以获取两个任务的返回值,并可以将任务三结果返回。

runAfterBoth

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务2...");return "hello";}, service);future1.runAfterBothAsync(future2, () -> {System.out.println("任务3 启动了....");}, service);
//        System.out.println("获取异步任务最终返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务1...
开启异步任务2...
main end ...
任务3 启动了....

可以看到,任务3是在任务1和任务2执行完成后,才执行的。

thenAcceptBoth

我们使用thenAcceptBoth可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值。我们看下案例。

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务2...");return "hello";}, service);future1.thenAcceptBothAsync(future2, (res1, res2) -> {System.out.println("任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2);}, service);
//        System.out.println("获取异步任务最终返回值:" + future.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务1...
开启异步任务2...
main end ...
任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello

我们可以看到,任务3在任务1和任务2执行后执行了,并获取了任务1和任务2的返回值。

thenCombine

可以获取两个任务的返回值,并可以将任务三结果返回

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务2...");return "hello";}, service);CompletableFuture<String> stringCompletableFuture = future1.thenCombineAsync(future2, (res1, res2) -> {System.out.println("任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2);return res1 + "-->" + res2;}, service);System.out.println("获取异步任务最终返回值:" + stringCompletableFuture.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务1...
开启异步任务2...
任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello
获取异步任务最终返回值:5-->hello
main end ...

六、两任务组合-一个任务执行

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)}
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)}public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

当两个任务中,任意一个future任务完成的时候,执行任务。

  • applyToEither 两个任务有一个任务执行完成,获取它的返回值,处理任务并有新的返回值。
  • acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  • runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

runAfterEitherAsync

不感知结果,自己没有返回值。

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("开启异步任务2...");return "hello";}, service);future1.runAfterEitherAsync(future2, () -> {System.out.println("任务3 启动了....");}, service);System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务1...
main end ...
任务3 启动了....
开启异步任务2...

我们可以看到,任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。

acceptEither

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("开启异步任务2...");return 10;}, service);future1.acceptEitherAsync(future2, (res) -> {System.out.println("任务3 启动了...., 任务结果是:" + res);}, service);System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务1...
main end ...
任务3 启动了...., 任务结果是:5
开启异步任务2...

可以看到,可以获取任务1的执行结果,但不返回执行结果。

applyToEither

可以感知结果,并返回执行结果。

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("开启异步任务2...");return 10;}, service);CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {System.out.println("任务3 启动了...., 上个任务结果是:" + res);return "我是任务三的返回值, 上个任务的执行结果是:" + res;}, service);System.out.println(stringCompletableFuture.get());System.out.println("main end ...");}
}

执行结果:

main start ...
开启异步任务1...
任务3 启动了...., 上个任务结果是:5
我是任务三的返回值, 上个任务的执行结果是:5
main end ...
开启异步任务2...

七、多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
  • allOf:等待所有任务完成
  • anyOf: 只要有一个任务完成
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品图片...");return "图片地址";}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品属性...");return "黑色 256G";}, service);CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品品牌...");return "苹果手机";}, service);CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);future.get();//等待索引结果完成System.out.println("main end ...");}
}

执行结果:

main start ...
查询商品图片...
查询商品属性...
查询商品品牌...
main end ...

注:如果不使用future.get()阻塞,若其中一个任务执行时间较长,则可能会丢失任务信息。

anyOf

public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品图片...");return "图片地址";}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品属性...");return "黑色 256G";}, service);CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品品牌...");return "苹果手机";}, service);CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("第一个执行成功的数据:" + objectCompletableFuture.get());System.out.println("main end ...");}
}

执行结果:

main start ...
查询商品图片...
查询商品属性...
查询商品品牌...
第一个执行成功的数据:图片地址
main end ...


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章