Tomcat实现ThreadPoolExecutor和JDK线程池区别

1.1 tomcat线程池和juc线程池流程

jdk 线程池策略:

  • 当线程池中线程数量小于 corePoolSize,每来一个任务,就会创建一个线程执行这个任务
  • 当前线程池线程数量大于等于 corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列中;若是添加成功,则该任务会等待线程将其取出去执行;若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行
  • 当前线程池线程数量等于 maximumPoolSize,则会采取任务拒绝策略进行处理

tomcat 线程池策略:

  • 当前线程数小于 corePoolSize,则去创建工作线程
  • 当前线程数大于 corePoolSize,但小于 maximumPoolSize,则去创建工作线程
  • 当前线程数大于 maximumPoolSize,则将任务放入到阻塞队列中,当阻塞队列满了之后,则调用拒绝策略丢弃任务

1.2 tomcat线程池和juc线程池的区别

tomcat 线程池是在 juc线程池的基础上,修改少量代码来实现,tomcat 线程池执行流程和 juc的线程池执行流程有着很大的区别,那么tomcat为什么要这样设计?

使用线程池的任务有两种:

  • IO 密集型任务(如调用接口、查询数据库)

  • CPU 密集型任务

  • JDK 的线程池 ThreadPoolExecutor 主要目的解决的便是CPU密集型任务的并发处理

  • Tomcat 若使用原生的JDK线程池,一旦接收的请求数量大于线程池的核心线程数,这些请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体处理速度,所以 Tomcat并没有使用JDK原生线程池的策略 (例如:10个核心线程,20个最大线程,15个请求过来的时候,有5个请求放入队列进行等待处理,这也是为什么 tomcat 要使 work 线程优先 queue的原因)

  • JDK线程池:当线程数达到 corePoolSize 后,任务首先被放到 queue,发挥CPU多核的并行优势,减少多个线程导致的上下文切换,适合的场景是:CPU密集型任务

  • Tomcat线程池:当大量请求达到时,接收的请求数量大于核心线程池的 corePoolSize 时,会继续创建 worker 线程去处理请求,而后续请求量变少时,只会销毁 maximumPoolSize 线程数,适合的场景是:IO密集型


1.3 IO密集型和CPU密集型任务核心参数的设置

  • 场景:JDK线程池执行IO密集型的任务,解决方案:可以提高corePoolSize的大小,引入问题:系统中线程数过多

  • 线程数过多问题的解决方案:指定 ThreadPoolExecutor 的 allowCoreThreadTimeout=true,那么核心线程若处于闲置状态的话,超过一定的时间(KeepAliveTime),就会销毁掉

  • 引入问题:当核心线程数被销毁时,而有大量请求到达时系统重新创建 worker 线程,会使得前期接口响应时间变长(这也是为什么系统上线需要预热)

  • 所以JDK线程池无法完美的去处理IO密集型的任务,这也就是为什么Tomcat需要重写JDK线程池的原因


Tomcat线程池执行流程

  • Tomcat 重写 JDK 线程池(即改动的是少量的源码)实现的功能的增强

  • 思想:主要流程还是JDK线程池流程,即先开启 corePoolSize 线程,然后在 queue,最后在开启 maximumPoolSize 线程,Tomcat 重点改造的是 queue 的 offer(),即在向 queue 放入任务时,若发现未达到最大线程数,那么 offer() 返回 false,即放入队列失败,此时,便继续开启 maximumPoolSize 线程

自定义队列

Tomcat 主要是通过实现自定义队列来完成逻辑的改造。

/*** 实现Tomcat特有逻辑的自定义队列*/
public class TaskQueue extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = 1L;private transient volatile ThreadPoolExecutor parent = null;private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1;/*** 强制遗留的容量*/private int forcedRemainingCapacity = -1;/*** 队列的构建方法*/public TaskQueue() {}public TaskQueue(int capacity) {super(capacity);}public TaskQueue(Collection<? extends Runnable> c) {super(c);}/*** 设置核心变量*/public void setParent(ThreadPoolExecutor parent) {this.parent = parent;}/*** put:向阻塞队列填充元素,当阻塞队列满了之后,put时会被阻塞。* offer:向阻塞队列填充元素,当阻塞队列满了之后,offer会返回false。** @param o 当任务被拒绝后,继续强制的放入到线程池中* @return 向阻塞队列塞任务,当阻塞队列满了之后,offer会返回false。*/public boolean force(Runnable o) {if (parent == null || parent.isShutdown()) {throw new RejectedExecutionException("taskQueue.notRunning");}return super.offer(o);}/*** 带有阻塞时间的塞任务*/@Deprecatedpublic boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if (parent == null || parent.isShutdown()) {throw new RejectedExecutionException("taskQueue.notRunning");}return super.offer(o, timeout, unit); //forces the item onto the queue, to be used if the task is rejected}/*** 当线程真正不够用时,优先是开启线程(直至最大线程),其次才是向队列填充任务。** @param runnable 任务* @return false 表示向队列中添加任务失败,*/@Overridepublic boolean offer(Runnable runnable) {if (parent == null) {return super.offer(runnable);}//若是达到最大线程数,进队列。if (parent.getPoolSize() == parent.getMaximumPoolSize()) {return super.offer(runnable);}//当前活跃线程为10个,但是只有8个任务在执行,于是,直接进队列。if (parent.getSubmittedCount() < (parent.getPoolSize())) {return super.offer(runnable);}//当前线程数小于最大线程数,那么直接返回false,去创建最大线程if (parent.getPoolSize() < parent.getMaximumPoolSize()) {return false;}//否则的话,将任务放入到队列中return super.offer(runnable);}/*** 获取任务*/@Overridepublic Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {Runnable runnable = super.poll(timeout, unit);//取任务超时,会停止当前线程,来避免内存泄露if (runnable == null && parent != null) {parent.stopCurrentThreadIfNeeded();}return runnable;}/*** 阻塞式的获取任务,可能返回null。*/@Overridepublic Runnable take() throws InterruptedException {//当前线程应当被终止的情况下:if (parent != null && parent.currentThreadShouldBeStopped()) {long keepAliveTime = parent.getKeepAliveTime(TimeUnit.MILLISECONDS);return poll(keepAliveTime, TimeUnit.MILLISECONDS);}return super.take();}/*** 返回队列的剩余容量*/@Overridepublic int remainingCapacity() {if (forcedRemainingCapacity > DEFAULT_FORCED_REMAINING_CAPACITY) {return forcedRemainingCapacity;}return super.remainingCapacity();}/*** 强制设置剩余容量*/public void setForcedRemainingCapacity(int forcedRemainingCapacity) {this.forcedRemainingCapacity = forcedRemainingCapacity;}/*** 重置剩余容量*/void resetForcedRemainingCapacity() {this.forcedRemainingCapacity = DEFAULT_FORCED_REMAINING_CAPACITY;}
}

2.2 自定义线程池ThreadPoolExecutor

Tomcat 线程池 ThreadPoolExecutor 是继承的 AbstractExecutorService 类,但是很多代码依旧使用的是 JDK 的 ThreadPoolExecutor,只是稍微改造了一部分

    public void execute(Runnable command, long timeout, TimeUnit unit) {/*** 提交任务的数量+1*/submittedCount.incrementAndGet();try {/*** 线程池内部方法,真正执行的方法。就是JDK线程池原生的方法。** 因为重写了阻塞队列,才完成Tomcat特有逻辑的实现。** 1. 重写队列方法;达到核心线程数,然后向阻塞队列中放置,阻塞队列直接返回false* 2. 返回false,则开启maximum pool size;* 3. maximum pool size到达极限时,会抛出RejectedExecutionException方法。**/executeInternal(command);/*** 任务被拒绝*/} catch (RejectedExecutionException rx) {/*** 在将被拒绝的任务放入到队列中。*/if (getQueue() instanceof TaskQueue) {//如果Executor接近最大线程数,应该将任务添加到队列中,而不是拒绝。final TaskQueue queue = (TaskQueue) getQueue();try {//强制的将任务放入到阻塞队列中if (!queue.force(command, timeout, unit)) {//放入失败,则继续抛出异常submittedCount.decrementAndGet();throw new RejectedExecutionException("threadPoolExecutor.queueFull");}} catch (InterruptedException x) {//被中断也抛出异常submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {//不是这种队列,那么当任务满了之后,直接抛出去。submittedCount.decrementAndGet();throw rx;}}}/*** JDK线程池的任务执行的逻辑*/private void executeInternal(Runnable command) {if (command == null) {throw new NullPointerException();}int c = ctl.get();//未达到corePoolSize数量,则去开启线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) {return;}c = ctl.get();}//开启到corePoolSize数量的工作线程,则将任务放入队列。//但是Tomcat重写了阻塞队列。//当放入workQueue.offer(command)返回false,则继续开启线程数if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (!isRunning(recheck) && remove(command)) {reject(command);} else if (workerCountOf(recheck) == 0) {addWorker(null, false);}} else if (!addWorker(command, false)) {//开启最大线程失败,则任务被拒绝。reject(command);}}

注意事项

在这里插入图片描述


tomcat 的优化

    private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) {decrementWorkerCount();}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {//获取剩余的最小线程数,若配置了允许最小线程数关闭的参数,min=0int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//当配置了允许最小线程关闭参数,且队列不为空的情况下,允许保留的最小线程数为1/*** 这是关闭空闲核心线程数时的判断。*/if (min == 0 && !workQueue.isEmpty()) {min = 1;}// https://bz.apache.org/bugzilla/show_bug.cgi?id=65454// If the work queue is not empty, it is likely that a task was// added to the work queue between this thread timing out and// the worker count being decremented a few lines above this// comment. In this case, create a replacement worker so that// the task isn't held in the queue waiting for one of the other// workers to finish./*** 若当前工作数量>允许的最小线程数,那么关闭改线程。* Tomcat对此处进行了优化。*/if (workerCountOf(c) >= min && workQueue.isEmpty()) {return; // replacement not needed}}/*** 没有终止线程,又重新开启线程*/addWorker(null, false);}}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部