您现在的位置是:首页 > 文章详情

java源码-ThreadPoolExecutor(3)

日期:2018-08-23点击:499

开篇

部门以前有一个很跳的程序员写了一个神奇的代码,通过不停创建ThreadPoolExecutor而不调用shutdown导致线程过多溢出,然后想想毕竟我还是一个不跳的程序员,所以还是研究研究比较合适。这篇文章就是说明线程池的退出过程
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)


shutdown源码解析

  • 1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock
  • 2、判断调用者是否有权限shutdown线程池
  • 3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务
  • 4、中断所有空闲线程 interruptIdleWorkers()
  • 5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理
  • 6、解锁
  • 7、尝试终止线程池 tryTerminate()
 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限检查 checkShutdownAccess(); // 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回 advanceRunState(SHUTDOWN); // 设置中断标志,中断所有等待任务的空闲线程 interruptIdleWorkers(); // ThreadPoolExecutor没什么操作 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试状态变为TERMINATED tryTerminate(); } private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); // 设置执行状态为SHUTDOWN if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } // onlyOne在这里传入为false private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历所有worker,通过判断能否获取锁来判定worker是否空闲 for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { // 针对能够获取到锁的线程,线程正阻塞在获取任务的过程中 // 通过中断线程然后迫使线程退出阻塞然后工作线程退出工作 // 然后工作线程就自然的被回收了 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } 


shutdownNow源码解析

shutdownNow() 和 shutdown()的大体流程相似,差别是:

  • 1、将线程池更新为stop状态
  • 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
  • 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
 public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限检查 checkShutdownAccess(); // 设置线程池状态为stop advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 移除所有待消费的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试状态变为TERMINATED tryTerminate(); return tasks; } 


awaitTermination源码解析

等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。

  • 等待过程中如果发现未超时那么通过for循环继续等待超时
 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } 


工作线程Worker退出逻辑

如果获取task为null就会退出while循环从而执行finally部分的逻辑,也就是processWorkerExit()方法执行清了工作.

 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果获取任务为null,那么就会退出当前工作线程 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 执行清了工作 processWorkerExit(w, completedAbruptly); } } 

getTask()方法(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))当中我们设置状态为SHUTDOWN,同时workQueue为空的情况下就会返回null,在外层循环中就会退出线程的工作,实现线程退出。

 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 


参考文章

Java中线程池ThreadPoolExecutor原理探究
Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

原文链接:https://yq.aliyun.com/articles/666319
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章