CompletableFuture 异步编排详解
CompletableFuture 异步编排详解
详情请查看:JulyWhj博客
业务场景:
查询商品详情页面逻辑比较复杂,有些数据需要远程调用,必然需要花费更多的时间。
假如商品详情每个页面查询,需要的如下的标准时间完成,那么用户需要10s才能完成。这里我们需采用异步查询,但是比如接口A查询商品信息,而接口B需要查询商品的SKU,接口C需要查询商品供应商等信息,如接口C必须依赖接口A或接口B的返回值。那么我们就需要使用CompletableFuture接口来实现。
一、开启异步编程
runAsync:无入参、无返回值
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) {System.out.println("main start ...");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println("开启异步任务...");}, service);System.out.println("main end ...");}
}
执行结果:
main start ...main end ...开启异步任务...
supplyAsync :无入参,可以获取返回值
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);@SneakyThrowspublic static void main(String[] args) {System.out.println("main start ...");CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");return "开启异步任务,我是返回值";}, service);System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务...
获取异步任务返回值:开启异步任务,我是返回值
main end ...
二、计算完成回调
当我们想第一个异步任务执行完成后,还需要做其他的事情。我们的CompletableFuture提供了计算完成时回调方法,whenComplete、whenCompleteAsync、exceptionally等接口。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
whenComplete 可以处理正常和异常的计算结果,exceptionally: 处理异常情况。
whenComplete和whenCompleteAsync 的区别是whenComplete 是执行当前任务的线程继续执行whenComplete的任务。
whenCompleteAsync: 是把whenCompleteAsync的任务继续提交给线程池来进行执行。
whenCompleteAsync
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);@SneakyThrowspublic static void main(String[] args) {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).whenCompleteAsync((res, exc) -> {System.out.println("异步任务完成了,执行结果是:" + res + " 异常是:" + exc);});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务...
异步任务完成了,执行结果是:5 异常是:null
获取异步任务返回值:5
main end ...
如果异步任务出现了异常,可以通过exc打印异常,我们在程序中设置一个运行时异常 ,如下:
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;if (i == 5) {throw new RuntimeException("远程服务调用失败");}return i;}, service).whenCompleteAsync((res, exc) -> {System.out.println("异步任务完成了,执行结果是:" + res + " 异常是:" + exc);});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务...
异步任务完成了,执行结果是:null 异常是:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
main end ...
exceptionally
上面异步任务出现了异常,我们可以使用exceptionally进行异常处理。exceptionally 接口可以接收一个异常,返回异常处理结果。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;if (i == 5) {throw new RuntimeException("远程服务调用失败");}return i;}, service).whenCompleteAsync((res, exc) -> {System.out.println("异步任务完成了,执行结果是:" + res + " 异常是:" + exc);}).exceptionally(throwable -> {System.out.println("进入了异常处理,捕获了" + throwable.getMessage() + "异常");return 5;});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务...
异步任务完成了,执行结果是:null 异常是:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
进入了异常处理,捕获了java.lang.RuntimeException: 远程服务调用失败异常
获取异步任务返回值:5
main end ...
我们可以看到,通过exceptionally可以捕获异步任务抛出来的异常信息,并对异常进行处理,并可以将处理结果返回。
whenComplete虽然可以得到异常信息,但是无法修改结果,exceptionally可以感知异常,同时可以返回默认值。
三、handle最终处理
handle和whenComplete方法类似,但是whenComplete能感知异常但是不能返回结果。只能通过exceptionally进行处理。
而handle即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;if (i == 5) {throw new RuntimeException("远程服务调用失败");}return i;}, service).handleAsync((res, thr) -> {System.out.println("进入handleAsync方法");if (res != null) {return res * 2;}if (thr != null) {System.out.println("捕获到异常" + thr);return 0;}return 0;}, service);System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务...
进入handleAsync方法
捕获到异常java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
获取异步任务返回值:0
main end ...
如果我们去掉异常信息,可以看到如下返回值,最终异步执行结果为10;
main start ...
开启异步任务...
进入handleAsync方法
获取异步任务返回值:10
main end ...
四、线程串行化
在CompletableFuture中有以下方法:
public CompletableFuture thenApply(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)public CompletableFuture thenAccept(Consumer super T> action)
public CompletableFuture thenAcceptAsync(Consumer super T> action)
public CompletableFuture thenAcceptAsync(Consumer super T> action,Executor executor)public CompletableFuture thenRun(Runnable action)
public CompletableFuture thenRunAsync(Runnable action)
public CompletableFuture thenRunAsync(Runnable action,Executor executor)
- thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值。
- thenAccept方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
- thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。
- thenRun 获取不到上个任务的执行结果,无返回值。
thenRun
thenRun 不能获取上一步的执行结果,并无返回值。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).thenRun(() -> {System.out.println("任务2启动了...");});System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}
运行结果:
main start ...
开启异步任务...
任务2启动了...
获取异步任务返回值:null
main end ...
如果我们需要获取上一步的执行结果,我们使用thenAccept;
thenAccept
消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).thenAcceptAsync((res) -> {System.out.println("任务2启动了... 上一步的结果是:" + res);}, service);System.out.println("获取异步任务返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务...
任务2启动了... 上一步的结果是:5
获取异步任务返回值:null
main end ...
如果我们即需要上一步执行结果,并需要返回值供别人使用,那么我们使用thenApply方法
thenApply
当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 / 2;return i;}, service).thenApplyAsync((res) -> {System.out.println("任务2启动了... 上一步的结果是:" + res);return res * 2;}, service);System.out.println("获取异步任务最终返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务...
任务2启动了... 上一步的结果是:5
获取异步任务最终返回值:10
main end ...
五、两任务组合-两个任务都完才
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor); public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) }
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)}
两个任务必须都完成,触发该任务。
- runAfterBoth 没有返回值,入参CompletionStage、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务)
- thenAcceptBoth 可以获取两个任务的返回值。
- thenCombine 可以获取两个任务的返回值,并可以将任务三结果返回。
runAfterBoth
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务2...");return "hello";}, service);future1.runAfterBothAsync(future2, () -> {System.out.println("任务3 启动了....");}, service);
// System.out.println("获取异步任务最终返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务1...
开启异步任务2...
main end ...
任务3 启动了....
可以看到,任务3是在任务1和任务2执行完成后,才执行的。
thenAcceptBoth
我们使用thenAcceptBoth可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值。我们看下案例。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务2...");return "hello";}, service);future1.thenAcceptBothAsync(future2, (res1, res2) -> {System.out.println("任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2);}, service);
// System.out.println("获取异步任务最终返回值:" + future.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务1...
开启异步任务2...
main end ...
任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello
我们可以看到,任务3在任务1和任务2执行后执行了,并获取了任务1和任务2的返回值。
thenCombine
可以获取两个任务的返回值,并可以将任务三结果返回
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务2...");return "hello";}, service);CompletableFuture<String> stringCompletableFuture = future1.thenCombineAsync(future2, (res1, res2) -> {System.out.println("任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2);return res1 + "-->" + res2;}, service);System.out.println("获取异步任务最终返回值:" + stringCompletableFuture.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务1...
开启异步任务2...
任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello
获取异步任务最终返回值:5-->hello
main end ...
六、两任务组合-一个任务执行
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)}
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)}public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
当两个任务中,任意一个future任务完成的时候,执行任务。
- applyToEither 两个任务有一个任务执行完成,获取它的返回值,处理任务并有新的返回值。
- acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
- runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
runAfterEitherAsync
不感知结果,自己没有返回值。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("开启异步任务2...");return "hello";}, service);future1.runAfterEitherAsync(future2, () -> {System.out.println("任务3 启动了....");}, service);System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务1...
main end ...
任务3 启动了....
开启异步任务2...
我们可以看到,任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。
acceptEither
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("开启异步任务2...");return 10;}, service);future1.acceptEitherAsync(future2, (res) -> {System.out.println("任务3 启动了...., 任务结果是:" + res);}, service);System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务1...
main end ...
任务3 启动了...., 任务结果是:5
开启异步任务2...
可以看到,可以获取任务1的执行结果,但不返回执行结果。
applyToEither
可以感知结果,并返回执行结果。
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务1...");int i = 10 / 2;return i;}, service);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("开启异步任务2...");return 10;}, service);CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {System.out.println("任务3 启动了...., 上个任务结果是:" + res);return "我是任务三的返回值, 上个任务的执行结果是:" + res;}, service);System.out.println(stringCompletableFuture.get());System.out.println("main end ...");}
}
执行结果:
main start ...
开启异步任务1...
任务3 启动了...., 上个任务结果是:5
我是任务三的返回值, 上个任务的执行结果是:5
main end ...
开启异步任务2...
七、多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- allOf:等待所有任务完成
- anyOf: 只要有一个任务完成
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品图片...");return "图片地址";}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品属性...");return "黑色 256G";}, service);CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品品牌...");return "苹果手机";}, service);CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);future.get();//等待索引结果完成System.out.println("main end ...");}
}
执行结果:
main start ...
查询商品图片...
查询商品属性...
查询商品品牌...
main end ...
注:如果不使用future.get()阻塞,若其中一个任务执行时间较长,则可能会丢失任务信息。
anyOf
public class CompletableFutureDemo {/*** 定义线程池*/public static ExecutorService service = Executors.newFixedThreadPool(5);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main start ...");CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品图片...");return "图片地址";}, service);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品属性...");return "黑色 256G";}, service);CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品品牌...");return "苹果手机";}, service);CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("第一个执行成功的数据:" + objectCompletableFuture.get());System.out.println("main end ...");}
}
执行结果:
main start ...
查询商品图片...
查询商品属性...
查询商品品牌...
第一个执行成功的数据:图片地址
main end ...
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
