并发编程系列学习笔记07(线程池)
线程池
-
问题背景
- 线程资源很宝贵
- 线程不是越多越好
- 频繁创建线程影响性能
-
自定义线程池
- 消费者线程池:Thread Pool
- 生产者任务队列:Blocking Queue
/*** @author 钦尘* @date 2021/8/3 23:10* @description 手写自定义线程池*/
@Slf4j
public class TestPool {/*** 测试** @param args*/public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 10);for (int i = 0; i < 5; i++) {int j = i;threadPool.execute(() -> {log.info("执行任务逻辑 {}", j);});}}}@Slf4j
class ThreadPool {/*** 任务队列*/private BlockingQueue taskQueue;/*** 线程集合*/private HashSet workers = new HashSet();/*** 核心线程数*/private int coreSize;/*** 获取任务的超时时间*/private long timeout;/*** 时间单位*/private TimeUnit timeUnit;public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);}/*** 执行任务** @param task*/public void execute(Runnable task) {// 当任务数 未超过 coreSize 直接交给 worker对象执行// 达到数量,加入队列synchronized (workers) {if (workers.size() < coreSize) {log.info("新增任务 {}", task);Worker worker = new Worker(task);workers.add(worker);worker.start();} else {log.info("加入任务队列 {}", task);taskQueue.put(task);}}}/*** 对线程进行包装*/class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1.当 task 不为 null 直接执行// 2.当 task执行完毕,从任务队列获取任务,执行// 测试不带超时 和 带超时的 区别,两种不同策略,一种是死等,一种是一段时间后放弃等待,线程销毁// while (task != null || (task = taskQueue.take()) != null) {while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.info("任务正在执行 {}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.info("任务执行完毕,被移除 {}", this);workers.remove(this);}}}}/*** 阻塞队列,存放线程未处理的任务** @param */
@Slf4j
class BlockingQueue {/*** 任务队列,使用双向链表,ArrayDeque比LinkedList性能更好*/private Deque queue = new ArrayDeque<>();/*** 锁,保护队列头尾元素*/private ReentrantLock lock = new ReentrantLock();/*** 队列最大容量*/private int capcity;/*** 生产者条件变量*/private Condition fullWaitSet = lock.newCondition();/*** 消费者条件变量*/private Condition emptyWaitSet = lock.newCondition();public BlockingQueue(int capcity) {this.capcity = capcity;}/*** 阻塞获取,不带超时时间* @return 任务*/public T take() {lock.lock();try {// 队列为空,则进入等待while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}// 拿到任务,从队列中移除,并返回该队列,同时通知 条件变量 fullWaitSetT t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}/*** 带超时的阻塞获取** @param timeout* @param unit* @return*/public T poll(long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {if (nanos <= 0) {return null;}try {// 有可能出现虚假唤醒,所以将返回的是剩余时间再次赋给 nanosnanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}// 取出元素,并将元素从队列中移除T task = queue.removeFirst();// 唤醒生产者线程可继续放入任务fullWaitSet.signal();return task;} finally {lock.unlock();}}public void put(T element) {lock.lock();try {// 如果当前任务队列容量已经满了,则让其进入等待状态while (queue.size() >= capcity) {try {log.info("任务队列已满,等待加入任务队列", element);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}// 队列有空闲空间,添加任务到队列尾部,并通知消费者线程开始处理log.info("任务加入到队列尾部 {}", element);queue.addLast(element);emptyWaitSet.signal();} finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}
}
-
ThreadPoolExeutor
-
线程池的五种状态
- 采用int高3位存储线程池状态,低29位标识线程数量
- 存储在一个原子变量clt中的原因:状态与数量合二为一,一次CAS原子操作即可完成赋值
- Running 、SHUTDOWN、STOP、TIDYING、TERMINATED
- 了解SHUTDDOWN 与 STOP区别
-
7个构造方法参数
- corePoolSize -> 核心线程数
- maximumPoolSize -> 允许的最大线程数
- keepAliveTime -> 当线程数大于核心数时,多余的空闲线程在终止前等待新任务的最长时间
- unit -> 时间单位
- workQueue -> 执行任务之前保存任务的队列
- threadFactory -> 创建新线程时使用的工厂
- rejectedExecutionHandler -> 达到线程边界和队列容量而阻塞执行时要使用的处理程序
-
核心线程与救急线程
- 核心+救急=最大
- 当核心线程都在忙,任务先进入队列
- 阻塞队列中放不下时,就会让救急线程去处理
- 高峰期过后,救急线程执行完任务,就会被销毁
- 核心线程不会销毁
- 核心线程与救急线程都繁忙,此时执行拒绝策略
- 救急线程的前提是配合有界队列实现
-
JDK提供的4种拒绝策略
- 抛出异常(默认)
- 让调用者运行任务
- 放弃任务
- 让队列中最早放入队列的任务,本任务取而代之
-
第三方拒绝策略扩展
- Dubbo:异常前会dump线程信息,方便定位问题
- Netty:创建新的线程来执行
- ActiveMQ:带超时等等,再重试
- PinPoint:逐一阐释策略链中各类拒绝策略
-
newFixedThreadPool
- 创建固定大小线程池,核心=最大,没有救急
- 有默认线程工厂实现:DefaultThreadFactory,也可自定义
- 无界队列,可放任意数量任务
- 适合线程数量很明确场景
-
newCachedThreadPool
- 创建带缓冲的线程池
- 创建的线程全部都是救急线程
- 底层基于SynchronousQueue无容量队列实现
- 线程数根据任务量数量增长而增长
- 适合任务数量较密集但任务执行时间较短业务
-
newSingleThreadExecutor
- 单线程池执行器,线程数固定为1
- 适合希望多个任务排队执行业务
- 比起我们自己创建一个线程执行任务更安全可靠
- 对比固定线程池FTP传1,STE进行外层装饰,只暴露基本能力,使用更安全,严格
-
提交任务
-
execute
-
支持执行带有返回结果任务:submit
-
invokeAll
- 执行提交的所有任务
-
invokeAny
- 找到最先执行的任务,执行完即算完
-
-
关闭线程池
-
shutdown
- 状态变为:shutdown
- 线程池会再接受新的任务
- 已提交的任务会继续执行完
- 不阻塞调用shutdown方法线程的后续逻辑执行
-
shutdownNow
- 不会接受新的任务
- 已提交任务会返回,可自行处理
- 正在执行的线程会被打断
- 可以立即终结
-
isShutdown
-
isTerminated
-
阻塞等待一定时间:awaitTerminateion
-
-
-
异步线程模式
-
基本理解
- 工作线程轮流移除处理无限多任务
- 也可归类为分工模式,典型实现则是线程池,也是享元模式的体现
- 原则:不同类型任务应该用不同线程池,可避免饥饿锁问题,进一步提升效率
- 类比生活的中服务员与厨师,明显属于两类不同任务类型,应该分工协作效率更高
-
饥饿问题(非死锁)
- 思考生活案例:服务员与厨师
- 若服务员和厨师两个人都能干点餐和炒菜的活
- 如果客人较多,有可能会导致所有线程一直处于点餐,而没有人做菜
- 解决方案:不同任务使用不同线程池
-
-
线程池线程数量
- 过小,容易导致饥饿问题
- 过大会频繁上下文切换,效率低
- CPU密集型运算,经验:CPU核数+1,+1是保证当前线程由于也缺失故障导致暂停,额外的线程能顶上去,保证CPU时钟周期不浪费
- I/O密集型,如文件读写、RPC调用、数据库读写
- IO密集型经验公式:线程数=核数*期望CPU利用率 * 总时间(CPU计算时间+等待时间)/CPU计算时间
- 举例:4核CPU,计算时间占50%,其他等待50%,期望CPU被100%利用,则结果为8个线程比较合适
- 时间占比一般可用监控工具观察,不需精确,取大致估计值即可
-
任务调度线程池
-
没有该线程池之前,可用Timer来实现
-
Timer特点
- 背后就一个线程,任务不能并发执行
- 前序任务耗时较长,将影响接续任务
- 可靠性差,前序任务异常可能影响后续任务
- 阿里巴巴规约已明显做提醒使用任务调度线程池代替
-
newScheduledThreadPool
- 基本使用
- 案例:每周四18:00执行任务,编写对应代码实现
-
/*** @author 钦尘* @date 2021/8/4 23:30* @description 线程池常用工具类*/
@Slf4j(topic = "test.TestThreadPoolExecutors")
public class TestThreadPoolExecutors {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);pool.execute(() -> {log.info("执行任务1");});pool.execute(() -> {log.info("执行任务2");});pool.execute(() -> {log.info("执行任务3");});// Executors.newCachedThreadPool();// Executors.newSingleThreadExecutor();// submit 方法TestSubmit(pool);TestInvokeAll(pool);TestInvokeAny(pool);TestSchedulePool();}/*** 任务调度线程池*/private static void TestSchedulePool() {/*** 多个任务之间异常,延时互不影响*/ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);scheduledPool.schedule(() -> {log.info("延迟1s, 执行task1");}, 1, TimeUnit.SECONDS);// 以某个速率周期性执行scheduledPool.scheduleAtFixedRate(() -> {log.info("定时每隔1s反复执行");// 如果执行任务耗时较长,超出间隔,则会在执行完成下,立即执行下次任务// 这种机制可防止任务重叠执行// Sleeper.sleep(2);}, 1, 1, TimeUnit.SECONDS);// 以某个速率周期性执行,注意与上面scheduleAtFixedRate的区别scheduledPool.scheduleWithFixedDelay(() -> {log.info("定时每隔1s反复执行");// 任务从上一次任务结束时间开始计算下个周期,这里则会间隔3秒执行Sleeper.sleep(2);}, 1, 1, TimeUnit.SECONDS);// 异常处理scheduledPool.schedule(() -> {log.info("异常信息模拟,默认不会抛出异常");int i = 1 / 0;// 以上异常不会抛出,方式1,可以自己手动捕获,方式2可以用submit,返回 Future,可以感知异常}, 1, TimeUnit.SECONDS);}private static void TestInvokeAny(ExecutorService pool) throws ExecutionException, InterruptedException {// 返回一个最先得到的结果String result = pool.invokeAny(Arrays.asList(() -> {log.info("begin1");Sleeper.sleep(1);return "1";},() -> {log.info("begin2");Sleeper.sleep(2);return "2";}, () -> {log.info("begin3");Sleeper.sleep(3);return "3";}));log.info("invokeAny() 执行的任务结果为 {}", result);}private static void TestInvokeAll(ExecutorService pool) throws InterruptedException, ExecutionException {List> futures = pool.invokeAll(Arrays.asList(() -> {log.info("begin");Sleeper.sleep(1);return "1";},() -> {log.info("begin");Sleeper.sleep(2);return "2";}, () -> {log.info("begin");Sleeper.sleep(3);return "3";}));for (Future -
Tomcat线程池
- 连接器即用到了线程池实现
- 具体参看tomcat-embed-core源码
- Tomcat配置项了解
-
Fork/Join
- JDK1.7后加入,体现的是分而治之思想
- 大任务拆小任务,分配给不同线程同时处理
- 最后将各线程执行结果进行归并
- 代码案例
/*** @author 钦尘* @date 2021/8/5 23:47* @description 应用fork-join,实现求 1 - n的和*/
@Slf4j
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);Integer result1 = pool.invoke(new MyTask1(5));log.info("计算结果1:{}", result1);Integer result2 = pool.invoke(new MyTask2(1, 5));log.info("计算结果2:{}", result2);}}/*** 1-n之间整数求和,思考下面的实现的高级之处* 存在的问题:任务之间相互等待依赖,并行度较低*/
@Slf4j
class MyTask1 extends RecursiveTask {private int n;public MyTask1(int n) {this.n = n;}@Overrideprotected Integer compute() {if (n == 1) {return 1;}MyTask1 t1 = new MyTask1(n - 1);// 拆分,让一个线程去执行此任务t1.fork();log.info("fork {}, {}", n, t1);// 获取执行结果int result = n + t1.join();log.info("join {}, {}, {}", n, t1, result);return result;}
}/*** 这样按范围拆分计算,可以提升线程并发度* 说明:任务拆分要多思考,怎么样合理,效率高,这方面其实对一般开发者有难度* 所以,到 JDK8 stream 底层自己自动实现了*/
@Slf4j
class MyTask2 extends RecursiveTask {private int begin;private int end;public MyTask2(int begin, int end) {this.begin = begin;this.end = end;}@Overrideprotected Integer compute() {if (begin == end) {return begin;}// 可以减少一次任务的拆分if (end - begin == 1) {return end + begin;}// 1 -> 5 = 3int mid = (end + begin) / 2;// 1,3MyTask2 t1 = new MyTask2(begin, mid);t1.fork();// 4,5MyTask2 t2 = new MyTask2(mid + 1, end);t2.fork();int result = t1.join() + t2.join();return result;}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
