java线程池使用(二)------部分源码解析
前一篇博客介绍了构造参数分别是什么意思
现在介绍java线程池源码
1 init
线程池初始化,代码比较简单
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || //核心线程数校验 maximumPoolSize <= 0 || //非核心线程数校验 maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) //必传参数校验 throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
2 线程池核心类Worker
线城池中,使用Worker来保证线程池线程的活跃,保证在没有任务执行时,也能保证线程资源不被释放掉
woker类实现了runable接口,本身就是一个线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //worker运行的线程,是线程池中实际的线程,通过ThreadFactory生成 final Thread thread; //线程池新增线程时的默认任务,可以为空 Runnable firstTask; //已经执行过的线程任务数 volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //将执行方法委托给ThreadPoolExecuter public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
3 线程池新增任务
下面看源码
核心方法1 ThreadPoolExecuter.execute()
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //判断当前线程数量和最大核心线程数 if (workerCountOf(c) < corePoolSize) {//线程数<最大核心线程数 if (addWorker(command, true))//尝试创建新的worker,如果创建成功则直接返回 return; c = ctl.get();//获取最新的的数量 } if (isRunning(c) && workQueue.offer(command)) {//1 检查线程池状态 //2当前线程数>=最大核心线程数。 //3尝试将新增的线程放入blockingqueue int recheck = ctl.get(); //对线程池状态进行二次校验,防止其他线程在offer期间对线程池状态进行了修改 if (! isRunning(recheck) && remove(command)) //二次校验,失败后将新增线程从blockingqueue移除 //二次校验成功则不移除,利用java&&的特性 reject(command); //校验失败进入过载策略 else if (workerCountOf(recheck) == 0) //二次校验成功,判断当前线程池线程数量是否归零, //如果归零,则创建新的空worker,由worker去进行处理queue //中的任务 addWorker(null, false); } else if (!addWorker(command, false))//如果queue.offer返回false,证明queue已经满了, // 尝试创建非核心线程 reject(command); //创建失败则证明已经到达最大线程数,进去过载策略 }
2 核心方法二 ThreadPoolExecuter.addWorker()
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获取当前线程池状态 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Serverless 微服务实践-移动应用包分发服务
背景 阿里云函数计算是事件驱动的全托管计算服务。通过函数计算,您无需管理服务器等基础设施,只需编写代码并上传。函数计算会为您准备好计算资源,以弹性、可靠的方式运行您的代码,并提供日志查询、性能监控、报警等功能。借助于函数计算,您可以快速构建任何类型的应用和服务,无需管理和运维。而且,您只需要为代码实际运行所消耗的资源付费,代码未运行则不产生费用。移动应用的打包和分发呈现明显的峰谷效用,用户常常需要短时间内准备大量资源保障分发的实时性,完成分发后又需要及时释放资源,降低成本。这里我们提供一个 fun 模板,帮助我们更快地搭建一个基于函数计算构建 Serverless 架构的包分发服务,在开发运维效率,性能和成本间取得良好的平衡。 在分包过程中,下载/修改/上传是一个比较消耗资源的任务,需要消耗大量的计算/网络资源。并且分包任务只在应用发
- 下一篇
spring 扩展功能心得(一)
spring作为现在最火的开发框架,熟练的使用spring和扩展功能,能大大提高开发效率首先从spring的启动来看,spring启动的核心方法是 refresh 方法,该方法定义在AbstractApplicationContext.class下面上代码 基于spring4.3.18,基于ClassPathXmlApplicationContext public void refresh() throws BeansException, IllegalStateException { Object var1 = this.startupShutdownMonitor; synchronized(this.startupShutdownMonitor) { //前置刷新工作,可以进行一些环境变量的校验 this.prepareRefresh(); //生成beanFactory 在生成过程中加载bean定义文件(spring.xml和spring注解等) ConfigurableListableBeanFactory beanFactory = this.obtainFreshBean...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Red5直播服务器,属于Java语言的直播服务器
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS关闭SELinux安全模块
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7设置SWAP分区,小内存服务器的救世主