线程池
线程池的基本使用
package com.shothook.test; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.time.Duration; import java.time.LocalDateTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j public class ThreadPoolDemo { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor(10, 1000, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10)); LocalDateTime start = LocalDateTime.now(); for (int i = 0; i < 1000; i++) { threadPool.submit(new Task(i)); } LocalDateTime end = LocalDateTime.now(); Duration duration = Duration.between(start,end); log.debug("submit task complete, takes time: {}", duration.getNano()); } @AllArgsConstructor @Slf4j public static class Task implements Runnable { private int i; @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("thread name: {}, to execute task: {}", Thread.currentThread().getName(), i); } } }
线程池的工作模式,简单来说如下所示:
主线程往缓冲队列中添加任务,线程池从队列中取任务并执行,当然,真实情况比这复杂的多,这个后面再详细分析,先有个概念就好。
创建线程池的各个参数的意义
线程池的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize:核心线程数
maximumPoolSize:最大线程数,当往缓冲队列中增加任务,缓冲队列满时,会在corePoolSize的基础上继续新建线程,知道触发最大线程数
keepAliveTime:超过核心线程数的线程的存活时间,对超过存活时间的,线程池结束超时的空闲线程
unit:超过核心线程数的线程的存活时间对应的时间单位
workQueue:线程池的任务缓存队列
threadFactory:创建线程的工厂
handler:当到最大线程数,并且缓存队列满时,继续添加任务时的处理策略
线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; }//获取线程池的状态 private static int workerCountOf(int c) { return c & CAPACITY; }//获取线程池的线程数 private static int ctlOf(int rs, int wc) { return rs | wc; }//参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl
ctl : 高三位保存线程池状态,低29位保存任务数。
线程池的创建
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行 * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理 * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution RejectedExecutionException是一个RuntimeException * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务 * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了 * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程 * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展) * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务 */ int c = ctl.get(); /** * 1、如果当前线程数少于corePoolSize(可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了) */ if (workerCountOf(c) < corePoolSize) { //addWorker()成功,返回 if (addWorker(command, true)) return; /** * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get()) * 失败的原因可能是: * 1、线程池已经shutdown,shutdown的线程池不再接收新任务 * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize */ c = ctl.get(); } /** * 2、如果线程池RUNNING状态,且入队列成功 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();//再次校验位 /** * 再次校验放入workerQueue中的任务是否能被执行 * 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务 * 2、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了) */ //如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command if (! isRunning(recheck) && remove(command)) reject(command); //如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null //为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢?? //只保证有一个worker线程可以从queue中获取任务执行就行了?? //因为只要还有活动的worker线程,就可以消费workerQueue中的任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask //第二个参数为true代表占用corePoolSize,false占用maxPoolSize } /** * 3、如果线程池不是running状态 或者 无法入队列 * 尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command */ else if (!addWorker(command, false)) reject(command); }
钩子
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }
线程池的类图
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java并发编程实战 04死锁了怎么办?
Java并发编程实战 04死锁了怎么办? Java并发编程文章系列#Java并发编程实战 01并发编程的Bug源头Java并发编程实战 02Java如何解决可见性和有序性问题Java并发编程实战 03互斥锁 解决原子性问题 前提#在第三篇文章最后的例子当中,需要获取到两个账户的锁后进行转账操作,这种情况有可能会发生死锁,我把上一章的代码片段放到下面: Copypublic class Account { // 余额 private Long money; public synchronized void transfer(Account target, Long money) { synchronized(this) { (1) synchronized (target) { (2) this.money -= money; if (this.money < 0) { // throw exception } target.money += money; } } } }若账户A转账给账户B100元,账户B同时也转账给账户A100元,当账户A转帐的线程A执行到了代码(1)处时,获取...
- 下一篇
Electron:PC 端多端融合方案
每天都要写第二天的 todoList。有一天在写的时候突然想到,为了让自己清楚知道自己需要做啥、做了多少、还剩多少没做,想写一个电脑端程序,在技术选型的时候就选了 electron。 本篇文章的目的不是讲解 API 如何使用,想知道这些可以直接看官方文档。本文目的旨在讲明如何技术如何选择、如何快速上手、如何调试、Electron 底层原理、工程体系方面的总结。 一、 方案选型 3天时间写了个 PC 端应用程序。先看看结果吧 为什么要选 electron 作为 pc 端开发方案? 史前时代,以 MFC 为代表的技术栈,开发效率较低,维护成本高。 后来使用 QT 技术,特点是使用 DirectUI + 面向对象 + XML 定义 UI,适用于小型软件、性能要求、包大小、UI 复杂度叫高的需求。 再到后来,以 QT Quick 为代表的技术,特点是框架本身提供子控件,基于子控件组合来创建新的控件。类似于 ActionScript 的脚本化界面逻辑代码。 新时代主要是以 electron 和 Cef 为 代表。特点是界面开发以 Web 技术为主,部分逻辑需要 Native 代码实现。大家都熟悉...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS6,CentOS7官方镜像安装Oracle11G
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS6,7,8上安装Nginx,支持https2.0的开启