并发编程系列学习笔记07(线程池)

线程池

  • 问题背景

    • 线程资源很宝贵
    • 线程不是越多越好
    • 频繁创建线程影响性能
  • 自定义线程池

    • 消费者线程池:Thread Pool
    • 生产者任务队列:Blocking Queue
/*** @author 钦尘* @date 2021/8/3 23:10* @description 手写自定义线程池*/
@Slf4j
public class TestPool {/*** 测试** @param args*/public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 10);for (int i = 0; i < 5; i++) {int j = i;threadPool.execute(() -> {log.info("执行任务逻辑 {}", j);});}}}@Slf4j
class ThreadPool {/*** 任务队列*/private BlockingQueue taskQueue;/*** 线程集合*/private HashSet workers = new HashSet();/*** 核心线程数*/private int coreSize;/*** 获取任务的超时时间*/private long timeout;/*** 时间单位*/private TimeUnit timeUnit;public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);}/*** 执行任务** @param task*/public void execute(Runnable task) {// 当任务数 未超过 coreSize 直接交给 worker对象执行// 达到数量,加入队列synchronized (workers) {if (workers.size() < coreSize) {log.info("新增任务 {}", task);Worker worker = new Worker(task);workers.add(worker);worker.start();} else {log.info("加入任务队列 {}", task);taskQueue.put(task);}}}/*** 对线程进行包装*/class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1.当 task 不为 null 直接执行// 2.当 task执行完毕,从任务队列获取任务,执行// 测试不带超时 和 带超时的 区别,两种不同策略,一种是死等,一种是一段时间后放弃等待,线程销毁// while (task != null || (task = taskQueue.take()) != null) {while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.info("任务正在执行 {}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.info("任务执行完毕,被移除 {}", this);workers.remove(this);}}}}/*** 阻塞队列,存放线程未处理的任务** @param */
@Slf4j
class BlockingQueue {/*** 任务队列,使用双向链表,ArrayDeque比LinkedList性能更好*/private Deque queue = new ArrayDeque<>();/*** 锁,保护队列头尾元素*/private ReentrantLock lock = new ReentrantLock();/*** 队列最大容量*/private int capcity;/*** 生产者条件变量*/private Condition fullWaitSet = lock.newCondition();/*** 消费者条件变量*/private Condition emptyWaitSet = lock.newCondition();public BlockingQueue(int capcity) {this.capcity = capcity;}/*** 阻塞获取,不带超时时间* @return 任务*/public T take() {lock.lock();try {// 队列为空,则进入等待while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}// 拿到任务,从队列中移除,并返回该队列,同时通知 条件变量 fullWaitSetT t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}/*** 带超时的阻塞获取** @param timeout* @param unit* @return*/public T poll(long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {if (nanos <= 0) {return null;}try {// 有可能出现虚假唤醒,所以将返回的是剩余时间再次赋给 nanosnanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}// 取出元素,并将元素从队列中移除T task = queue.removeFirst();// 唤醒生产者线程可继续放入任务fullWaitSet.signal();return task;} finally {lock.unlock();}}public void put(T element) {lock.lock();try {// 如果当前任务队列容量已经满了,则让其进入等待状态while (queue.size() >= capcity) {try {log.info("任务队列已满,等待加入任务队列", element);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}// 队列有空闲空间,添加任务到队列尾部,并通知消费者线程开始处理log.info("任务加入到队列尾部 {}", element);queue.addLast(element);emptyWaitSet.signal();} finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}
}
  • ThreadPoolExeutor

    • 线程池的五种状态

      • 采用int高3位存储线程池状态,低29位标识线程数量
      • 存储在一个原子变量clt中的原因:状态与数量合二为一,一次CAS原子操作即可完成赋值
      • Running 、SHUTDOWN、STOP、TIDYING、TERMINATED
      • 了解SHUTDDOWN 与 STOP区别
    • 7个构造方法参数

      • corePoolSize -> 核心线程数
      • maximumPoolSize -> 允许的最大线程数
      • keepAliveTime -> 当线程数大于核心数时,多余的空闲线程在终止前等待新任务的最长时间
      • unit -> 时间单位
      • workQueue -> 执行任务之前保存任务的队列
      • threadFactory -> 创建新线程时使用的工厂
      • rejectedExecutionHandler -> 达到线程边界和队列容量而阻塞执行时要使用的处理程序
    • 核心线程与救急线程

      • 核心+救急=最大
      • 当核心线程都在忙,任务先进入队列
      • 阻塞队列中放不下时,就会让救急线程去处理
      • 高峰期过后,救急线程执行完任务,就会被销毁
      • 核心线程不会销毁
      • 核心线程与救急线程都繁忙,此时执行拒绝策略
      • 救急线程的前提是配合有界队列实现
    • JDK提供的4种拒绝策略

      • 抛出异常(默认)
      • 让调用者运行任务
      • 放弃任务
      • 让队列中最早放入队列的任务,本任务取而代之
    • 第三方拒绝策略扩展

      • Dubbo:异常前会dump线程信息,方便定位问题
      • Netty:创建新的线程来执行
      • ActiveMQ:带超时等等,再重试
      • PinPoint:逐一阐释策略链中各类拒绝策略
    • newFixedThreadPool

      • 创建固定大小线程池,核心=最大,没有救急
      • 有默认线程工厂实现:DefaultThreadFactory,也可自定义
      • 无界队列,可放任意数量任务
      • 适合线程数量很明确场景
    • newCachedThreadPool

      • 创建带缓冲的线程池
      • 创建的线程全部都是救急线程
      • 底层基于SynchronousQueue无容量队列实现
      • 线程数根据任务量数量增长而增长
      • 适合任务数量较密集但任务执行时间较短业务
    • newSingleThreadExecutor

      • 单线程池执行器,线程数固定为1
      • 适合希望多个任务排队执行业务
      • 比起我们自己创建一个线程执行任务更安全可靠
      • 对比固定线程池FTP传1,STE进行外层装饰,只暴露基本能力,使用更安全,严格
    • 提交任务

      • execute

      • 支持执行带有返回结果任务:submit

      • invokeAll

        • 执行提交的所有任务
      • invokeAny

        • 找到最先执行的任务,执行完即算完
    • 关闭线程池

      • shutdown

        • 状态变为:shutdown
        • 线程池会再接受新的任务
        • 已提交的任务会继续执行完
        • 不阻塞调用shutdown方法线程的后续逻辑执行
      • shutdownNow

        • 不会接受新的任务
        • 已提交任务会返回,可自行处理
        • 正在执行的线程会被打断
        • 可以立即终结
      • isShutdown

      • isTerminated

      • 阻塞等待一定时间:awaitTerminateion

  • 异步线程模式

    • 基本理解

      • 工作线程轮流移除处理无限多任务
      • 也可归类为分工模式,典型实现则是线程池,也是享元模式的体现
      • 原则:不同类型任务应该用不同线程池,可避免饥饿锁问题,进一步提升效率
      • 类比生活的中服务员与厨师,明显属于两类不同任务类型,应该分工协作效率更高
    • 饥饿问题(非死锁)

      • 思考生活案例:服务员与厨师
      • 若服务员和厨师两个人都能干点餐和炒菜的活
      • 如果客人较多,有可能会导致所有线程一直处于点餐,而没有人做菜
      • 解决方案:不同任务使用不同线程池
  • 线程池线程数量

    • 过小,容易导致饥饿问题
    • 过大会频繁上下文切换,效率低
    • CPU密集型运算,经验:CPU核数+1,+1是保证当前线程由于也缺失故障导致暂停,额外的线程能顶上去,保证CPU时钟周期不浪费
    • I/O密集型,如文件读写、RPC调用、数据库读写
    • IO密集型经验公式:线程数=核数*期望CPU利用率 * 总时间(CPU计算时间+等待时间)/CPU计算时间
    • 举例:4核CPU,计算时间占50%,其他等待50%,期望CPU被100%利用,则结果为8个线程比较合适
    • 时间占比一般可用监控工具观察,不需精确,取大致估计值即可
  • 任务调度线程池

    • 没有该线程池之前,可用Timer来实现

    • Timer特点

      • 背后就一个线程,任务不能并发执行
      • 前序任务耗时较长,将影响接续任务
      • 可靠性差,前序任务异常可能影响后续任务
      • 阿里巴巴规约已明显做提醒使用任务调度线程池代替
    • newScheduledThreadPool

      • 基本使用
      • 案例:每周四18:00执行任务,编写对应代码实现
/*** @author 钦尘* @date 2021/8/4 23:30* @description 线程池常用工具类*/
@Slf4j(topic = "test.TestThreadPoolExecutors")
public class TestThreadPoolExecutors {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);pool.execute(() -> {log.info("执行任务1");});pool.execute(() -> {log.info("执行任务2");});pool.execute(() -> {log.info("执行任务3");});// Executors.newCachedThreadPool();// Executors.newSingleThreadExecutor();// submit 方法TestSubmit(pool);TestInvokeAll(pool);TestInvokeAny(pool);TestSchedulePool();}/*** 任务调度线程池*/private static void TestSchedulePool() {/*** 多个任务之间异常,延时互不影响*/ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);scheduledPool.schedule(() -> {log.info("延迟1s, 执行task1");}, 1, TimeUnit.SECONDS);// 以某个速率周期性执行scheduledPool.scheduleAtFixedRate(() -> {log.info("定时每隔1s反复执行");// 如果执行任务耗时较长,超出间隔,则会在执行完成下,立即执行下次任务// 这种机制可防止任务重叠执行// Sleeper.sleep(2);}, 1, 1, TimeUnit.SECONDS);// 以某个速率周期性执行,注意与上面scheduleAtFixedRate的区别scheduledPool.scheduleWithFixedDelay(() -> {log.info("定时每隔1s反复执行");// 任务从上一次任务结束时间开始计算下个周期,这里则会间隔3秒执行Sleeper.sleep(2);}, 1, 1, TimeUnit.SECONDS);// 异常处理scheduledPool.schedule(() -> {log.info("异常信息模拟,默认不会抛出异常");int i = 1 / 0;// 以上异常不会抛出,方式1,可以自己手动捕获,方式2可以用submit,返回 Future,可以感知异常}, 1, TimeUnit.SECONDS);}private static void TestInvokeAny(ExecutorService pool) throws ExecutionException, InterruptedException {// 返回一个最先得到的结果String result = pool.invokeAny(Arrays.asList(() -> {log.info("begin1");Sleeper.sleep(1);return "1";},() -> {log.info("begin2");Sleeper.sleep(2);return "2";}, () -> {log.info("begin3");Sleeper.sleep(3);return "3";}));log.info("invokeAny() 执行的任务结果为 {}", result);}private static void TestInvokeAll(ExecutorService pool) throws InterruptedException, ExecutionException {List> futures = pool.invokeAll(Arrays.asList(() -> {log.info("begin");Sleeper.sleep(1);return "1";},() -> {log.info("begin");Sleeper.sleep(2);return "2";}, () -> {log.info("begin");Sleeper.sleep(3);return "3";}));for (Future future : futures) {log.info("多个线程执行结果 {}", future.get());}}private static void TestSubmit(ExecutorService pool) throws InterruptedException, ExecutionException {Future future = pool.submit(() -> {log.info("执行任务");Sleeper.sleep(1);return "ok";});log.info("任务执行结果 {}", future.get());}
}
 
  • Tomcat线程池

    • 连接器即用到了线程池实现
    • 具体参看tomcat-embed-core源码
    • Tomcat配置项了解
  • Fork/Join

    • JDK1.7后加入,体现的是分而治之思想
    • 大任务拆小任务,分配给不同线程同时处理
    • 最后将各线程执行结果进行归并
    • 代码案例
/*** @author 钦尘* @date 2021/8/5 23:47* @description 应用fork-join,实现求 1 - n的和*/
@Slf4j
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);Integer result1 = pool.invoke(new MyTask1(5));log.info("计算结果1:{}", result1);Integer result2 = pool.invoke(new MyTask2(1, 5));log.info("计算结果2:{}", result2);}}/*** 1-n之间整数求和,思考下面的实现的高级之处* 存在的问题:任务之间相互等待依赖,并行度较低*/
@Slf4j
class MyTask1 extends RecursiveTask {private int n;public MyTask1(int n) {this.n = n;}@Overrideprotected Integer compute() {if (n == 1) {return 1;}MyTask1 t1 = new MyTask1(n - 1);// 拆分,让一个线程去执行此任务t1.fork();log.info("fork {}, {}", n, t1);// 获取执行结果int result = n + t1.join();log.info("join {}, {}, {}", n, t1, result);return result;}
}/*** 这样按范围拆分计算,可以提升线程并发度* 说明:任务拆分要多思考,怎么样合理,效率高,这方面其实对一般开发者有难度* 所以,到 JDK8 stream 底层自己自动实现了*/
@Slf4j
class MyTask2 extends RecursiveTask {private int begin;private int end;public MyTask2(int begin, int end) {this.begin = begin;this.end = end;}@Overrideprotected Integer compute() {if (begin == end) {return begin;}// 可以减少一次任务的拆分if (end - begin == 1) {return end + begin;}// 1 -> 5 = 3int mid = (end + begin) / 2;// 1,3MyTask2 t1 = new MyTask2(begin, mid);t1.fork();// 4,5MyTask2 t2 = new MyTask2(mid + 1, end);t2.fork();int result = t1.join() + t2.join();return result;}
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章