您现在的位置是:首页 > 文章详情

java线程池使用(二)------部分源码解析

日期:2020-01-13点击:351

前一篇博客介绍了构造参数分别是什么意思
现在介绍java线程池源码
1 init
线程池初始化,代码比较简单

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || //核心线程数校验 maximumPoolSize <= 0 || //非核心线程数校验 maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) //必传参数校验 throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 

2 线程池核心类Worker
线城池中,使用Worker来保证线程池线程的活跃,保证在没有任务执行时,也能保证线程资源不被释放掉
woker类实现了runable接口,本身就是一个线程

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //worker运行的线程,是线程池中实际的线程,通过ThreadFactory生成 final Thread thread; //线程池新增线程时的默认任务,可以为空 Runnable firstTask; //已经执行过的线程任务数 volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //将执行方法委托给ThreadPoolExecuter public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

3 线程池新增任务
image

下面看源码
核心方法1 ThreadPoolExecuter.execute()

 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //判断当前线程数量和最大核心线程数 if (workerCountOf(c) < corePoolSize) {//线程数<最大核心线程数 if (addWorker(command, true))//尝试创建新的worker,如果创建成功则直接返回 return; c = ctl.get();//获取最新的的数量 } if (isRunning(c) && workQueue.offer(command)) {//1 检查线程池状态 //2当前线程数>=最大核心线程数。 //3尝试将新增的线程放入blockingqueue int recheck = ctl.get(); //对线程池状态进行二次校验,防止其他线程在offer期间对线程池状态进行了修改 if (! isRunning(recheck) && remove(command)) //二次校验,失败后将新增线程从blockingqueue移除 //二次校验成功则不移除,利用java&&的特性 reject(command); //校验失败进入过载策略 else if (workerCountOf(recheck) == 0) //二次校验成功,判断当前线程池线程数量是否归零, //如果归零,则创建新的空worker,由worker去进行处理queue //中的任务 addWorker(null, false); } else if (!addWorker(command, false))//如果queue.offer返回false,证明queue已经满了, // 尝试创建非核心线程 reject(command); //创建失败则证明已经到达最大线程数,进去过载策略 }

2 核心方法二 ThreadPoolExecuter.addWorker()

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获取当前线程池状态 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
原文链接:https://yq.aliyun.com/articles/742618
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章