解锁Future新姿势:CompletableFuture初探

CompletableFuture作用

官方解释

CompletableFuture可以显式完成(设置其值和状态),并且可以用作 aCompletionStage,支持在完成时触发的相关功能和动作。

当两个或多个线程尝试 complete、 completeExceptionally或 cancel CompletableFuture 时,只有其中一个成功。

除了这些以及直接操作状态和结果的相关方法之外,CompletableFuture 还实现CompletionStage了与以下策略的接口:

  • 为非异步方法的依赖完成提供的操作 可以由完成当前 CompletableFuture 的线程执行,或者由完成方法的任何其他调用者执行。
  • 所有没有显式 Executor 参数的异步方法都使用 来执行ForkJoinPool.commonPool() (除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口的实例CompletableFuture.AsynchronousCompletionTask。
  • 所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一种方法的行为不会受到子类中其他方法的覆盖的影响。

CompletableFuture 还
使用了
Future以下策略实现:

  • 由于(不同于FutureTask)此类无法直接控制导致其完成的计算,因此取消被视为另一种形式的异常完成。方法cancel与 的效果相同 completeExceptionally(new CancellationException())。方法 isCompletedExceptionally()可用于确定 CompletableFuture 是否以任何异常方式完成。
  • 如果出现CompletionException完成异常,方法get()并get(long, TimeUnit)抛出 ExecutionException 与对应的 CompletionException 中相同的原因。为了在大多数情况下简化使用,此类还定义了方法join(),并且 getNow(T)在这些情况下直接抛出CompletionException。

简单一句话来讲,就是它拥有Future的所有姿势,并且还有自己的绝活。

源码

先简单扫描下源码。

public class CompletableFuture implements Future, CompletionStage {.../* ------------- public methods -------------- */public CompletableFuture() {}CompletableFuture(Object r) {this.result = r;}public static  CompletableFuture supplyAsync(Supplier supplier) {return asyncSupplyStage(ASYNC_POOL, supplier);}public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}public static CompletableFuture runAsync(Runnable runnable) {return asyncRunStage(ASYNC_POOL, runnable);}public static CompletableFuture runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}public static  CompletableFuture completedFuture(U value) {return new CompletableFuture((value == null) ? NIL : value);}public boolean isDone() {return result != null;}@SuppressWarnings("unchecked")public T get() throws InterruptedException, ExecutionException {Object r;if ((r = result) == null)r = waitingGet(true);return (T) reportGet(r);}@SuppressWarnings("unchecked")public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {long nanos = unit.toNanos(timeout);Object r;if ((r = result) == null)r = timedGet(nanos);return (T) reportGet(r);}@SuppressWarnings("unchecked")public T join() {Object r;if ((r = result) == null)r = waitingGet(false);return (T) reportJoin(r);}@SuppressWarnings("unchecked")public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);}public boolean complete(T value) {boolean triggered = completeValue(value);postComplete();return triggered;}public boolean completeExceptionally(Throwable ex) {if (ex == null) throw new NullPointerException();boolean triggered = internalComplete(new AltResult(ex));postComplete();return triggered;}public  CompletableFuture thenApply(Function fn) {return uniApplyStage(null, fn);}public  CompletableFuture thenApplyAsync(Function fn) {return uniApplyStage(defaultExecutor(), fn);}public  CompletableFuture thenApplyAsync(Function fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);}public CompletableFuture thenAccept(Consumer action) {return uniAcceptStage(null, action);}public CompletableFuture thenAcceptAsync(Consumer action) {return uniAcceptStage(defaultExecutor(), action);}public CompletableFuture thenAcceptAsync(Consumer action,Executor executor) {return uniAcceptStage(screenExecutor(executor), action);}public CompletableFuture thenRun(Runnable action) {return uniRunStage(null, action);}public CompletableFuture thenRunAsync(Runnable action) {return uniRunStage(defaultExecutor(), action);}public CompletableFuture thenRunAsync(Runnable action,Executor executor) {return uniRunStage(screenExecutor(executor), action);}public  CompletableFuture thenCombine(CompletionStage other,BiFunction fn) {return biApplyStage(null, other, fn);}public  CompletableFuture thenCombineAsync(CompletionStage other,BiFunction fn) {return biApplyStage(defaultExecutor(), other, fn);}public  CompletableFuture thenCombineAsync(CompletionStage other,BiFunction fn, Executor executor) {return biApplyStage(screenExecutor(executor), other, fn);}public  CompletableFuture thenAcceptBoth(CompletionStage other,BiConsumer action) {return biAcceptStage(null, other, action);}public  CompletableFuture thenAcceptBothAsync(CompletionStage other,BiConsumer action) {return biAcceptStage(defaultExecutor(), other, action);}public  CompletableFuture thenAcceptBothAsync(CompletionStage other,BiConsumer action, Executor executor) {return biAcceptStage(screenExecutor(executor), other, action);}public CompletableFuture runAfterBoth(CompletionStage other,Runnable action) {return biRunStage(null, other, action);}public CompletableFuture runAfterBothAsync(CompletionStage other,Runnable action) {return biRunStage(defaultExecutor(), other, action);}public CompletableFuture runAfterBothAsync(CompletionStage other,Runnable action,Executor executor) {return biRunStage(screenExecutor(executor), other, action);}public  CompletableFuture applyToEither(CompletionStage other, Function fn) {return orApplyStage(null, other, fn);}public  CompletableFuture applyToEitherAsync(CompletionStage other, Function fn) {return orApplyStage(defaultExecutor(), other, fn);}public  CompletableFuture applyToEitherAsync(CompletionStage other, Function fn,Executor executor) {return orApplyStage(screenExecutor(executor), other, fn);}public CompletableFuture acceptEither(CompletionStage other, Consumer action) {return orAcceptStage(null, other, action);}public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action) {return orAcceptStage(defaultExecutor(), other, action);}public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action,Executor executor) {return orAcceptStage(screenExecutor(executor), other, action);}public CompletableFuture runAfterEither(CompletionStage other,Runnable action) {return orRunStage(null, other, action);}public CompletableFuture runAfterEitherAsync(CompletionStage other,Runnable action) {return orRunStage(defaultExecutor(), other, action);}public CompletableFuture runAfterEitherAsync(CompletionStage other,Runnable action,Executor executor) {return orRunStage(screenExecutor(executor), other, action);}public  CompletableFuture thenCompose(Function> fn) {return uniComposeStage(null, fn);}public  CompletableFuture thenComposeAsync(Function> fn) {return uniComposeStage(defaultExecutor(), fn);}public  CompletableFuture thenComposeAsync(Function> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}public CompletableFuture whenComplete(BiConsumer action) {return uniWhenCompleteStage(null, action);}public CompletableFuture whenCompleteAsync(BiConsumer action) {return uniWhenCompleteStage(defaultExecutor(), action);}public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor) {return uniWhenCompleteStage(screenExecutor(executor), action);}public  CompletableFuture handle(BiFunction fn) {return uniHandleStage(null, fn);}public  CompletableFuture handleAsync(BiFunction fn) {return uniHandleStage(defaultExecutor(), fn);}public  CompletableFuture handleAsync(BiFunction fn, Executor executor) {return uniHandleStage(screenExecutor(executor), fn);}/*** Returns this CompletableFuture.** @return this CompletableFuture*/public CompletableFuture toCompletableFuture() {return this;}// not in interface CompletionStage/*** Returns a new CompletableFuture that is completed when this* CompletableFuture completes, with the result of the given* function of the exception triggering this CompletableFuture's* completion when it completes exceptionally; otherwise, if this* CompletableFuture completes normally, then the returned* CompletableFuture also completes normally with the same value.* Note: More flexible versions of this functionality are* available using methods {@code whenComplete} and {@code handle}.** @param fn the function to use to compute the value of the* returned CompletableFuture if this CompletableFuture completed* exceptionally* @return the new CompletableFuture*/public CompletableFuture exceptionally(Function fn) {return uniExceptionallyStage(fn);}/* ------------- Arbitrary-arity constructions -------------- */public static CompletableFuture allOf(CompletableFuture... cfs) {return andTree(cfs, 0, cfs.length - 1);}public static CompletableFuture anyOf(CompletableFuture... cfs) {int n; Object r;if ((n = cfs.length) <= 1)return (n == 0)? new CompletableFuture(): uniCopyStage(cfs[0]);for (CompletableFuture cf : cfs)if ((r = cf.result) != null)return new CompletableFuture(encodeRelay(r));cfs = cfs.clone();CompletableFuture d = new CompletableFuture<>();for (CompletableFuture cf : cfs)cf.unipush(new AnyOf(d, cf, cfs));if (d.result != null)for (int i = 0, len = cfs.length; i < len; i++)if (cfs[i].result != null)for (i++; i < len; i++)if (cfs[i].result == null)cfs[i].cleanStack();return d;}/* ------------- Control and status methods -------------- */
。。。
} 

我们可以看到它实现了Future。所以一般的Future的姿势,它都有。

使用场景

一般情况下,使用Future都和计算类的有关。

1、聚合/汇总计算,多线程计算完成后,把所有计算的结果一起相加后输出。

2、异步编程。不需要等待当前操作结果。例如日志输入,唤起第三方(不对结果进行负责)

常用方法

在这里我们看下公共方法,其他的方法例如状态控制相关方法,因为用的比较少,暂不考虑。

如果有兴趣的话,可以看对应的方法。

get

获取返回结果用的。这个方法是最基本的方法。

因为所有的CompletableFuture 的方法返回的是CompletableFuture 对象,如果你想要获得对应的U对象值,就必须使用get方法,和使用Stream流中的get原理是一样的。

runAsync

异步操作运行完成后,CompletableFuture 没有返回值。因为它的返回值是void。

如果你要想获得对应的返回参数信息,就需要使用supplyAsync

supplyAsync

带返回值的异步操作。

例如,您需要获取当前用户的消息数。

  CompletableFuture futureMessageNum = CompletableFuture.supplyAsync(() -> getMessageNumByUId(uid)
);

因为这里只有一行代码,所以直接省略了{}和return。

你也可以这样写


CompletableFuture futureMessageNum = CompletableFuture.supplyAsync(() -> {return getMessageNumByUId(uid);}
);

allOf

这个方法就是用来聚合的,所有的异步操作全部完成之后,它才会完成。

所以一般情况下,如果我们希望返回多个异步操作结果的时候,我们都会使用allOf函数来进行聚合。

这个方法没有返回类型。

CompletableFuture allOf(CompletableFuture... cfs) 

例如如果有多个异步计算。

//消息数目CompletableFuture futureMessageNum =...//粉丝数CompletableFuture futureFansNum =...//积分CompletableFuture futureIntegral=...//文章数CompletableFuture futureArticleNum =...

我们需要把所有的信息都赋值到对应的用户输出信息对象UserInfo中,然后给到前端调用。

因为走的是异步,我们没有办法保证所有方法同一时间完成。

所以,就需要allOf方法来保证所有异步的全部完成,然后再将异步计算结果赋值给用户对象对应的属性。

CompletableFuture allFuture = CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);

CompletableFuture allFuture = CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);

它必须等到futureMessageNum,futureFansNum,futureIntegral,futureArticleNum全部执行完成。

因为这个特性。我们可以在执行完当前代码后。

然后使用get方法,将对应的值赋值到UserInfo对应的属性。

测试代码

全部测试代码如下

@Data
public class UserInfo{private Integer uid;private String name;private Integer messageNum;private Integer fansNum;private Integer integral;private Integer articleNum;
...
}

...@Overid
public UserInfo getUserInfoById(Integer id)
{UserInfo userInfo = new UserInfo();//消息数目CompletableFuture futureMessageNum = CompletableFuture.supplyAsync(() -> getMessageNumByUId(uid)
);//粉丝数CompletableFuture futureFansNum = CompletableFuture.supplyAsync(() -> getFansNumByUId(uid)
);//积分CompletableFuture futureIntegral= CompletableFuture.supplyAsync(() -> getIntegralByUId(uid)
);//文章数CompletableFuture futureArticleNum = CompletableFuture.supplyAsync(() -> getArticleNumByUId(uid)
);CompletableFuture allFuture= CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);try{ //get方法有异常抛出,所以需要tryuserInfo.setMessageNum(futureMessageNum.get());userInfo.setFansNum(futureFansNum.get());userInfo.setIntegral(futureIntegral.get());userInfo.setArticlNum(futureArticleNum.get());。。。
}

completedFuture

将传入的对象转换为completedFuture对象

join

聚合。如果上面所有的返回都是一样的值,就可以使用join函数将他们放到一起。

这个和ForkJoinTask差不多。

thenApply

接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

thenCompose

thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。

总结

这个是1.8之后推出来的功能。

1.8之前的版本就不要想了。

最好的办法就是自己代码跑跑看,跑的时候你还能解锁更多的宫那个方法


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章