CompletableFuture异步编程详解

文章目录

  • Future介绍
  • CompletableFuture类
    • 创建线程
    • thenAccept()方法
  • 实际场景

Future介绍

先来回顾下Future,Future是JDK1.5中添加的接口,主要功能为:

  • 获取并发的任务完成后的执行结果;
  • 能够取消并发执行中的任务;
  • 判断并发任务是否执行完成;

但Future也有着非常明显的缺点:

  1. 阻塞:调用 get() 方法会一直阻塞,直到等待直到计算完成;
  2. 异常处理:Future 没有提供任何异常处理的方式;
  3. 链式调用和结果聚合处理:在很多时候我们想链接多个 Future 来完成耗时较长的计算,此时需要合并结果并将结果发送到另一个任务中,该接口很难完成这种处理;

CompletableFuture类

上面介绍了Future的缺点,这些问题都可以通过CompletableFuture类解决,主要方法有:

  • thenApply():当执行完第一个异步程序,接着执行下一个;
  • thenAccept():当任务正常完成后,回调此方法;
  • exceptionally():当任务出现异常是,回调此方法;
  • anyOf():当所有的任务中,只要有一个任务完成,则主线程继续往下走;
  • allOf():所有的任务均完成后,则主线程继续往下走;
  • join():既线程合并(或者抛出一个 CompletionException 异常),阻塞当前线程,直到异步线程并发执行完成,也就是是join()方法还是异步阻塞;
  • supplyAsync():异步执行,有返回值;
  • runAsync():异步执行,无返回值;
  • thenCombine():两个线程完成之后进行合并返回;

创建线程

	// 有返回值public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}// 无返回值public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}

可以看出,supply开头的两个方法是有返回值的,而run开头的两个方法是没有返回值的,至于第二个方法传入的Executor,这个在编码中可以自定义;

thenAccept()方法

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(asyncPool, fn);}public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);}    

功能:当前任务正常完成以后,可将当前任务的返回值作为参数传给下一个任务;
执行任务A,任务A执行完成之后,将任务A返回值作为参数传入给任务B,打印结果为3:

  CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 1);CompletableFuture<Integer> futureB = futureA.thenApply((a) -> a + 2);log.info("result:{}",futureB.get());

实际场景

以工作中常见的场景举个例子,例如在A服务中,调用B、C服务的结果:

  1. 公共实体:
/*** @author 岳晓鵬* @version 1.0* @date 2022-06-10 00:11*/
@Data
@Builder
public class User {private String userName;private Integer age;}
  1. B、C服务代码:
    /*** B服务伪接口 获取用户姓名* @param userId* @return*/public static String getUserName(Integer userId){try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "张三";}/*** C服务伪接口 获取用户年龄** @param userId* @return*/private static Integer getAge(Integer userId){try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}return 20;}
  1. A服务获取数据:
    public static void main(String[] args) throws ExecutionException, InterruptedException {Long start = System.currentTimeMillis();// 异步获取 B C服务的数据CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> getAge(10));CompletableFuture<String> userNameFuture = CompletableFuture.supplyAsync(() -> getUserName(10));// 获取用户数据User user = User.builder().age(ageFuture.get()).userName(userNameFuture.get()).build();Long end = System.currentTimeMillis();log.info("执行时间:{}ms",end - start);log.info("用户:{}",user);}
  1. 打印结果:
 - 执行时间:5075ms- 用户:User(userName=张三, age=20)

打印结果显示执行时间为 5s+,而不是 7s+,说明异步已经生效;
但其中get()方法还是阻塞的,如果线程执行时间较长,主线程将一直阻塞下去,另外还有一个get()方法可以添加超时时间:

  • get():阻塞获取线程执行结果;
  • get(long time):阻塞获取线程执行结果,如超过超时时间则继续向下执行;
  • getNow():阻塞获取线程执行结果,如果线程抛出异常,则返回默认值;

上面的例子也可以使用all() + join()方式获取数据:

CompletableFuture.allOf(ageFuture, userNameFuture).join();


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部