异步、线程池(ExecutorService、ThreadPoolExecutor、CompletableFuture)

文章目录

      • 1、概述:
      • 2、初始化线程的四种方式:
      • 3、七大参数:
        • 3.1、七大参数详解
        • 3.2、如何合理设置核心线程数:
      • 4、工作顺序:
      • 5、Executors常用创建线程池方法:
      • 6、四种拒绝策略:
      • 7、CompletableFuture启动异步任务
      • 8、方法执行完成后的感知
      • 9、方法执行完成后的处理
      • 10、线程池串行方法
      • 11、两任务组合
        • 11.1、都要完成
        • 11.2、一个完成即可
      • 12、多任务组合


1、概述:

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。

他的主要特点为:线程复用; 控制最大并发数; 管理线程。

第一: 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二: 提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
第三: 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降抵系统的稳定性,使用线程池可以逆行统一的分配,调优和监控


2、初始化线程的四种方式:

  • 1、继承Thread
  • 2、实现Runnable
  • 3、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
  • 4、线程池

优缺点:

  • 1、2无法获取返回值,3可以获取返回值(阻塞式等待)
  • 1、2、3都不能控制资源,
  • 4可以控制资源,性能稳定

代码示例:

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4j
public class CreateThreadTest {public static void main(String[] args) {// 1、继承Threadnew Thread(new Thread01()).start();// 2、实现Runnablenew Thread(new Rannable01()).start();// 3、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)FutureTask<String> stringFutureTask = new FutureTask<>(new Callable01());new Thread(stringFutureTask).start();try {log.info("结果:{}",stringFutureTask.get());//阻塞式等待} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 4、线程池ExecutorService executorService = Executors.newFixedThreadPool(10);executorService.submit(()->log.info("方式4..."));//submit可以获取结果,execute无法获取结果executorService.shutdown();//关闭}
}
@Slf4j
class Thread01 extends Thread{@Overridepublic void run() {log.info("方式1...");}
}
@Slf4j
class Rannable01 implements Runnable{@Overridepublic void run() {log.info("方式2...");}
}
@Slf4j
class Callable01 implements Callable<String>{@Overridepublic String call() throws Exception {log.info("方式3...");return "Callable01";}
}

在这里插入图片描述


3、七大参数:

3.1、七大参数详解
	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler){if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
  • int corePoolSize,核心线程数【只要线程池不销毁(设置allowCoreThreadTimeOut属性),就一直存在】;线程池创建好之后准备就绪的线程数,等待接收异步亲求去执行;
  • int maximumPoolSize,最大线程数,可用于控制资源
  • long keepAliveTime,存活时间,当前线程数量大于指定的核心线程数量,只要存活时间达到该值,就会被释放。(maximumPoolSize-corePoolSize)
  • TimeUnit unit,时间单位
  • BlockingQueue workQueue,阻塞队列,如果任务很多,多余的任务会被放在队列中
  • ThreadFactory threadFactory,创建线程的工厂
  • RejectedExecutionHandler handler 如果队列满了,按照我们指定的拒绝策略执行任务

如果是要五大参数,就是前五个,后两个默认:

  • 例如:使用五个参数的构造方法:
    在这里插入图片描述
    其中默认的拒绝策略:(丢弃并抛异常)
    在这里插入图片描述

3.2、如何合理设置核心线程数:
  • CPU密集型
    在这里插入图片描述

  • IO密集型

    • 方式1:
      在这里插入图片描述

    • 方式2:
      在这里插入图片描述


4、工作顺序:

在这里插入图片描述

工作顺序【1核心线程 -> 2阻塞队列 -> 3最大线程 -> 4拒绝策略(超过最大线程)、释放线程(超过空闲时间)】

  • 1、线程池创建,准备好指定数量(corePoolSize)的核心线程,以接收任务
  • 2、核心线程满了,再进入的线程就会被放进阻塞队列(workQueue)中,当有空闲核心线程,就会去阻塞队列中获取任务
  • 3、阻塞队列满了,就直接开新的线程执行,最大只能开到maximumPoolSize指定的数量
  • 4、达到maximumPoolSize个线程,就用拒绝策略拒绝任务
  • 5、如果当前线程数大于指定核心线程数(corePoolSize),多出的每一个线程在空闲指定时间(keepAliveTime)过后,释放线程

依据:从execute方法可以看出来:
在这里插入图片描述在这里插入图片描述


5、Executors常用创建线程池方法:

前三种更为常用,都是使用ThreadPoolExecutor创建的线程池;第四种采用ScheduledThreadPoolExecutor创建线程池;第5种采用ForkJoinPool创建线程池;他们的关系如下图:
在这里插入图片描述

  • 1、Executors.newCachedThreadPool();core是0,所有都可回收;适合执行短期异步的小程序或者负载较轻的服务器
    在这里插入图片描述

  • 2、Executors.newFixedThreadPool();固定大小,core=max;适合执行长期的任务
    在这里插入图片描述

  • 3、Executors.newSingleThreadExecutor();单线程的线程池,后台从队列里面获取任务,挨个执行;适合一个任务一个任务执行的场景
    在这里插入图片描述

  • 4、Executors.newScheduledThreadPool();定时任务的线程池
    在这里插入图片描述

  • 5、Executors.newWorkStealingPool();ForkJoinPool 分支合并
    在这里插入图片描述

一般不使用上述方法创建线程池,前两个阻塞队列的大小是Integer.MAX_VALUE,后两个的最大线程数是Integer.MAX_VALUE。如,阿里巴巴Java开发手册中这样描述:
在这里插入图片描述


6、四种拒绝策略:

是什么?

  • 等待队列也已经排满了,再也塞不下新任务了同时,
  • 线程池中的max线程也达到了,无法继续为新任务服务。
  • 这时候我们就需要拒绝策略机制合理的处理这个问题。

先看接口类RejectedExecutionHandler:只有一个rejectedExecution方法,来决定如何拒绝
在这里插入图片描述

它有以下四个实现类:

  • DiscardOldestPolicy: poll,移除第一个;删除工作队列 最早的一个,再尝试运行当前的r(依照工作顺序运行,即再检验一遍是否小于核心线程数、队列是否满了、是否达到最大线程数)。
    在这里插入图片描述

  • AbortPolicy: 很明显,直接抛异常
    在这里插入图片描述

  • CallerRunsPolicy: 直接调用run方法,哪来的就在哪运行
    在这里插入图片描述

  • DiscardPolicy:这个有意思,啥都没干,相当于直接忽略
    在这里插入图片描述

代码演示:

  • 使用下面的代码,分别测试四种拒绝策略:
  • 核心2,最大5,阻塞3,10个任务(0~9)
    • 前两个:0、1直接运行
    • 2、3、4进入阻塞队列
    • 5、6、7新开线程
    • 8、9走拒绝策略
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();log.info(rejectedExecutionHandler.getClass().getName());// 核心2,最大5,阻塞队列3;最多同时执行 5 + 3 = 8 个任务
ExecutorService executorService = new ThreadPoolExecutor(2,5,30, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),rejectedExecutionHandler);try {//运行10个任务,会有多出的任务for (int i = 0; i < 10; i++) {int finalI = i;executorService.execute(() -> {log.info("执行任务{}...", finalI);});}
} finally {executorService.shutdown();
}
  • 抛异常:
    在这里插入图片描述

  • 直接忽略组后来的:
    在这里插入图片描述

  • 丢弃队列最早的:
    在这里插入图片描述

  • 哪来的在哪运行:最后来的两个在main中运行:
    在这里插入图片描述


7、CompletableFuture启动异步任务

  • runAsync:无返回值
  • supplyAsync:有返回值
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
public class CompletableFutureTest {private static ExecutorService executor = Executors.newFixedThreadPool(2);public static void main(String[] args) {//方式1、无返回值CompletableFuture.runAsync(()->log.info("runAsync"),executor);// 方式2、有返回值CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {log.info("supplyAsync");return "supplyAsync";}, executor);//获取返回值try {String res = supplyAsync.get();log.info("返回值:{}",res);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}

8、方法执行完成后的感知

CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {log.info("创建...");return 10 / 0;}, executor)//处理完成后用当前线程处理,如果为whenCompleteAsync,则用其他线程处理.whenComplete((res, e) -> {log.info("结果:{},异常:{}", res, e);})//出现异常后的回调.exceptionally((throwable) -> {log.info("出现异常,返回0,异常:{}", throwable.getCause().toString());return 0;});//打印结果
try {log.info("结果:{}",supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}

在这里插入图片描述


9、方法执行完成后的处理

CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {log.info("创建...");return 10 / 2;// return 10 / 0;}, executor)//参数:BiFunction fn;接收结果、异常,返回新结果.handle((res, e) -> {if (res != null) {log.info("结果不为空");return res * 2;}if (e != null) {log.info("有异常");return 0;}log.info("其他情况");return 0;});//打印结果
try {log.info("结果:{}", supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}

在这里插入图片描述


10、线程池串行方法

基本方法:

  • thenApply:接收上一个返回结果,返回新结果
  • thenAccept:只能接收上一个返回结果,无返回值
  • thenRun:不接受返回结果也不返回结果
    在这里插入图片描述

现实现一个需求:

  • 先启动一个线程,任务1
  • 任务2获取任务1的返回结果,并返回新结果
  • 任务3在同一个线程上获取任务2的结果,但不返回任何结果
  • 任务4既不接收上一个返回结果,也不返回结果

代码:

    private static void 线程池串行方法() {CompletableFuture.supplyAsync(() -> {log.info("任务1...");return 111;})//获取结果,返回新结果(类型可不同).thenApplyAsync((res) -> {log.info("任务2...获取到上一个的结果:{}", res);return "222";}, executor)//获取上一个结果,但不返回结果.thenAccept((res) -> {log.info("任务3...获取到上一个的结果:{}", res);})//既不获取,也不返回.thenRunAsync(() -> {log.info("任务4...无法获取到上一个的结果");}, executor);// 在测试包下运行,线程睡眠,防止提前结束try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}

在这里插入图片描述


11、两任务组合

11.1、都要完成
  • thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回结果
  • thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
  • runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后处理任务
  • 以上方法后面加上Async,表示新开一个线程,例如:thenCombineAsync

示例:使用第三个任务,获取前两个任务的返回结果,并进行拼串,返回最终结果:

//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {log.info("任务1...");return 111;}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {log.info("任务2...");return 222;}, executor);//任务3在前两个执行完后执行:
CompletableFuture<String> cf3 = cf1.thenCombineAsync(cf2, (f1, f2) -> {log.info("任务3...获取到的结果:{},{}", f1, f2);return f1 + "" + f2;
}, executor);//打印任务3的结果
try {log.info("任务3的返回值:{}",cf3.get());
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}

在这里插入图片描述

11.2、一个完成即可
  • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并由新返回值
  • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,但无返回值
  • runAfterEither:两个任务有一个执行完成,不获取它的返回值,处理任务,也没有返回值
  • 均可加Async
  • 组合的两个任务需要有相同的返回值

以下测试三种方式:

//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {log.info("任务1...");return 111;}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {log.info("任务2...开始");try {Thread.sleep(2000);log.info("任务2...结束");} catch (InterruptedException e) {e.printStackTrace();}return 222;}, executor);//=========方式1=========
//任务3在前两个执行完后执行:
CompletableFuture<Integer> cf3 = cf1.applyToEitherAsync(cf2, (f1) -> {log.info("任务3-1...获取到的结果:{},并把结果扩大2倍", f1);return f1 * 2;
}, executor);
//打印cf3的结果
try {log.info("任务3-1的返回值:{}", cf3.get());
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}//=========方式2=========
cf1.acceptEitherAsync(cf2, (f1) -> {log.info("任务3-2获取到的结果:{}", f1);
}, executor);//=========方式3=========
cf1.runAfterEitherAsync(cf2,() -> log.info("任务3-3:前两个任务有一个完成了"),executor);

在这里插入图片描述


12、多任务组合

CompletableFuture中有两个静态方法:

  • allOf:所有任务都完成
  • anyOf:一个任务完成即可
    在这里插入图片描述

代码示例:

//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {log.info("任务1...");return 111;}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {log.info("任务2...开始");try {Thread.sleep(2000);log.info("任务2...结束");} catch (InterruptedException e) {e.printStackTrace();}return 222;}, executor);//一个完成即可:
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(cf1, cf2);
try {log.info("有一个任务完成了,返回值:{}",anyOf.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}//都要完成:
CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2);
try {log.info("任务都完成了,返回值:{},{}",cf1.get(),cf2.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}

在这里插入图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部