java源码-ThreadPoolExecutor(3)
开篇
部门以前有一个很跳的程序员写了一个神奇的代码,通过不停创建ThreadPoolExecutor而不调用shutdown导致线程过多溢出,然后想想毕竟我还是一个不跳的程序员,所以还是研究研究比较合适。这篇文章就是说明线程池的退出过程。
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)
shutdown源码解析
- 1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock
- 2、判断调用者是否有权限shutdown线程池
- 3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务
- 4、中断所有空闲线程 interruptIdleWorkers()
- 5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理
- 6、解锁
- 7、尝试终止线程池 tryTerminate()
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限检查 checkShutdownAccess(); // 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回 advanceRunState(SHUTDOWN); // 设置中断标志,中断所有等待任务的空闲线程 interruptIdleWorkers(); // ThreadPoolExecutor没什么操作 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试状态变为TERMINATED tryTerminate(); } private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); // 设置执行状态为SHUTDOWN if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } // onlyOne在这里传入为false private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历所有worker,通过判断能否获取锁来判定worker是否空闲 for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { // 针对能够获取到锁的线程,线程正阻塞在获取任务的过程中 // 通过中断线程然后迫使线程退出阻塞然后工作线程退出工作 // 然后工作线程就自然的被回收了 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
shutdownNow源码解析
shutdownNow() 和 shutdown()的大体流程相似,差别是:
- 1、将线程池更新为stop状态
- 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
- 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限检查 checkShutdownAccess(); // 设置线程池状态为stop advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 移除所有待消费的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试状态变为TERMINATED tryTerminate(); return tasks; }
awaitTermination源码解析
等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。
- 等待过程中如果发现未超时那么通过for循环继续等待超时
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
工作线程Worker退出逻辑
如果获取task为null就会退出while循环从而执行finally部分的逻辑,也就是processWorkerExit()方法执行清了工作.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果获取任务为null,那么就会退出当前工作线程 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 执行清了工作 processWorkerExit(w, completedAbruptly); } }
getTask()方法(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))当中我们设置状态为SHUTDOWN,同时workQueue为空的情况下就会返回null,在外层循环中就会退出线程的工作,实现线程退出。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
参考文章
Java中线程池ThreadPoolExecutor原理探究
Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Python全栈 正则表达式(re模块正则接口全方位详解)
re模块是Python的标准库模块 模块正则接口的整体模式 re.compile 返回regetx对象 finditer fullmatch match search 返回 match对象 match.属性|方法 re模块的使用: regex = re.compile(pattern,flags = 0) 功能 : 生成正则表达式对象 参数 : pattern 正则表达式 flags 功能标志位,丰富正则表达式的匹配 返回值: 返回一个正则表达式对象 re.findall(pattern,string,flags = 0) 功能 : 根据正则表达式
- 下一篇
【java】简单实现数据库连接池
一直在想java事务是怎么实现的,在原声jdbc的时候级别下,我们可以通过关掉autocommit 然后再手动commit。但是项目开发中基本上是看不见conection的。所以自己决定简单实现框架的一点皮毛功能。首先就是数据库连接池了 1. 先定义一个接口 import java.sql.Connection; public interface IConnectionPool { /** * 获取一个连接 * @return */ Connection getConnection(); /** * 用完后调用,把连接放回池中,实现复用 */ void freeLocalConnection(); /** * 销毁连接池 */ void destroy(); //测试用 void status(); } 2. 实现数据库连接池的代码, 为了线程安全,简单粗暴地用synchronized关键字 实现事务的关键是,我们执行一个事务的Connection是同一个,我们可以在事务控制的时候用AOP,在事务开始的时候 调用setAutoCommit(false) 然后在事务代码之后调用commi...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2整合Redis,开启缓存,提高访问速度