**本文关键字:** `线程`,`线程池`,`单线程`,`多线程`,`线程池的好处`,`线程回收`,`创建方式`,`核心参数`,`底层机制`,`拒绝策略`,`参数设置`,`动态监控`,`线程隔离` 线程和线程池相关的知识,是Java学习或者面试中一定会遇到的知识点,本篇我们会从线程和进程,并行与并发,单线程和多线程等,一直讲解到线程池,线程池的好处,创建方式,重要的核心参数,几个重要的方法,底层实现,拒绝策略,参数设置,动态调整,线程隔离等等。主要的大纲如下:  ## 线程池的好处 线程池,使用了池化思想来管理线程,池化技术就是为了最大化效益,最小化用户风险,将资源统一放在一起管理的思想。这种思想在很多地方都有使用到,不仅仅是计算机,比如金融,企业管理,设备管理等。 为什么要线程池?如果在并发的场景,编码人员根据需求来创建线程池,可能会有以下的问题: - 我们很难确定系统有多少线程在运行,如果使用就创建,不使用就销毁,那么创建和销毁线程的消耗也是比较大的 - 假设来了很多请求,可能是爬虫,疯狂创建线程,可能把系统资源耗尽。 实现线程池有什么好处呢? - 降低资源消耗:池化技术可以重复利用已经创建的线程,降低线程创建和销毁的损耗。 - 提高响应速度:利用已经存在的线程进行处理,少去了创建线程的时间 - 管理线程可控:线程是稀缺资源,不能无限创建,线程池可以做到统一分配和监控 - 拓展其他功能:比如定时线程池,可以定时执行任务 其实池化技术,用在比较多地方,比如: - 数据库连接池:数据库连接是稀缺资源,先创建好,提高响应速度,重复利用已有的连接 - 实例池:先创建好对象放到池子里面,循环利用,减少来回创建和销毁的消耗 ## 线程池相关的类 下面是与线程池相关的类的继承关系:  ### Executor `Executor` 是顶级接口,里面只有一个方法`execute(Runnable command)`,定义的是调度线程池来执行任务,它定义了线程池的基本规范,执行任务是它的天职。 ### ExecutorService `ExecutorService` 继承了`Executor`,但是它仍然是一个接口,它多了一些方法:  - `void shutdown()`:关闭线程池,会等待任务执行完。 - `List
shutdownNow()`:立刻关闭线程池,尝试停止所有正在积极执行的任务,停止等待任务的处理,并**返回一个正在等待执行的任务列表(还没有执行的)**。 - `boolean isShutdown()`:判断线程池是不是已经关闭,但是可能线程还在执行。 - `boolean isTerminated()`:在执行shutdown/shutdownNow之后,所有的任务已经完成,这个状态就是true。 - `boolean awaitTermination(long timeout, TimeUnit unit)`:执行shutdown之后,阻塞等到terminated状态,除非超时或者被打断。 - `
Future
submit(Callable
task)`: 提交一个有返回值的任务,并且返回该任务尚未有结果的Future,调用future.get()方法,可以返回任务完成的时候的结果。 - `
Future
submit(Runnable task, T result)`:提交一个任务,传入返回结果,这个result没有什么作用,只是指定类型和一个返回的结果。 - `Future submit(Runnable task)`: 提交任务,返回Future - `
List
> invokeAll(Collection> tasks)`:批量执行tasks,获取Future的list,可以批量提交任务。 - `
List
> invokeAll(Collection> tasks,long timeout, TimeUnit unit)`:批量提交任务,并指定超时时间 - `
T invokeAny(Collection> tasks)`: 阻塞,获取第一个完成任务的结果值, - `
T invokeAny(Collection> tasks,long timeout, TimeUnit unit)`:阻塞,获取第一个完成结果的值,指定超时时间 可能有同学对前面的`
Future
submit(Runnable task, T result)`有疑问,这个reuslt有什么作用? 其实它没有什么作用,只是持有它,任务完成后,还是调用 `future.get()`返回这个结果,用`result` new 了一个 `ftask`,其内部其实是使用了Runnable的包装类 `RunnableAdapter`,没有对result做特殊的处理,调用 `call()` 方法的时候,直接返回这个结果。(Executors 中具体的实现) ```java public
Future
submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task, result); execute(ftask); return ftask; } static final class RunnableAdapter
implements Callable
{ final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); // 返回传入的结果 return result; } } ``` 还有一个方法值得一提:`invokeAny()`: 在 `ThreadPoolExecutor`中使用`ExecutorService` 中的方法 `invokeAny()` 取得第一个完成的任务的结果,当第一个任务执行完成后,会调用 `interrupt()` 方法将其他任务中断。 注意,`ExecutorService`是接口,里面都是定义,并没有涉及实现,而前面的讲解都是基于它的名字(规定的规范)以及它的普遍实现来说的。 可以看到 `ExecutorService` 定义的是线程池的一些操作,包括关闭,判断是否关闭,是否停止,提交任务,批量提交任务等等。 ### AbstractExecutorService `AbstractExecutorService` 是一个抽象类,实现了 `ExecutorService`接口,这是大部分线程池的基本实现,定时的线程池先不关注,主要的方法如下:  不仅实现了`submit`,`invokeAll`,`invokeAny` 等方法,而且提供了一个 `newTaskFor` 方法用于构建 `RunnableFuture` 对象,那些能够获取到任务返回结果的对象都是通过 `newTaskFor` 来获取的。不展开里面所有的源码的介绍,仅以submit()方法为例: ```java public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); // 封装任务 RunnableFuture
ftask = newTaskFor(task, null); // 执行任务 execute(ftask); // 返回 RunnableFuture 对象 return ftask; } ``` 但是在 `AbstractExecutorService` 是没有对最最重要的方法进行实现的,也就是 `execute()` 方法。线程池具体是怎么执行的,这个不同的线程池可以有不同的实现,一般都是继承 `AbstractExecutorService` (定时任务有其他的接口),我们最最常用的就是`ThreadPoolExecutor`。  ### ThreadPoolExecutor **重点来了!!!** `ThreadPoolExecutor` 一般就是我们平时常用到的线程池类,所谓创建线程池,如果不是定时线程池,就是使用它。 先看`ThreadPoolExecutor`的内部结构(属性): ```java public class ThreadPoolExecutor extends AbstractExecutorService { // 状态控制,主要用来控制线程池的状态,是核心的遍历,使用的是原子类 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 用来表示线程数量的位数(使用的是位运算,一部分表示线程的数量,一部分表示线程池的状态) // SIZE = 32 表示32位,那么COUNT_BITS就是29位 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池的容量,也就是27位表示的最大值 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 状态量,存储在高位,32位中的前3位 // 111(第一位是符号位,1表示负数),线程池运行中 private static final int RUNNING = -1 << COUNT_BITS; // 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 private static final int STOP = 1 << COUNT_BITS; // 010 private static final int TIDYING = 2 << COUNT_BITS; // 011 private static final int TERMINATED = 3 << COUNT_BITS; // 取出运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 取出线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } // 用运行状态和线程数获取ctl private static int ctlOf(int rs, int wc) { return rs | wc; } // 任务等待队列 private final BlockingQueue
workQueue; // 可重入主锁(保证一些操作的线程安全) private final ReentrantLock mainLock = new ReentrantLock(); // 线程的集合 private final HashSet
workers = new HashSet
(); // 在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(), // 传统线程的通信方式,Condition都可以实现,Condition和传统的线程通信没什么区别,Condition的强大之处在于它可以为多个线程间建立不同的Condition private final Condition termination = mainLock.newCondition(); // 最大线程池大小 private int largestPoolSize; // 完成的任务数量 private long completedTaskCount; // 线程工厂 private volatile ThreadFactory threadFactory; // 任务拒绝处理器 private volatile RejectedExecutionHandler handler; // 非核心线程的存活时间 private volatile long keepAliveTime; // 允许核心线程的超时时间 private volatile boolean allowCoreThreadTimeOut; // 核心线程数 private volatile int corePoolSize; // 工作线程最大容量 private volatile int maximumPoolSize; // 默认的拒绝处理器(丢弃任务) private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 运行时关闭许可 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); // 上下文 private final AccessControlContext acc; // 只有一个线程 private static final boolean ONLY_ONE = true; } ``` #### 线程池状态 从上面的代码可以看出,用一个32位的对象保存线程池的状态以及线程池的容量,高3位是线程池的状态,而剩下的29位,则是保存线程的数量: ```java // 状态量,存储在高位,32位中的前3位 // 111(第一位是符号位,1表示负数),线程池运行中 private static final int RUNNING = -1 << COUNT_BITS; // 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 private static final int STOP = 1 << COUNT_BITS; // 010 private static final int TIDYING = 2 << COUNT_BITS; // 011 private static final int TERMINATED = 3 << COUNT_BITS; ``` 各种状态之间是不一样的,他们的状态之间变化如下:  - RUNNING:运行状态,可以接受任务,也可以处理任务 - SHUTDOWN:不可以接受任务,但是可以处理任务 - STOP:不可以接受任务,也不可以处理任务,中断当前任务 - TIDYING:所有线程停止 - TERMINATED:线程池的最后状态 #### Worker 实现 线程池,肯定得有池子,并且是放线程的地方,在 `ThreadPoolExecutor` 中表现为 `Worker`,这是内部类:  线程池其实就是 `Worker` (打工人,不断的领取任务,完成任务)的集合,这里使用的是 `HashSet`: ```java private final HashSet
workers = new HashSet
(); ``` `Worker` 怎么实现的呢? `Worker` 除了继承了 `AbstractQueuedSynchronizer`,也就是 `AQS` , `AQS` 本质上就是个队列锁,一个简单的互斥锁,一般是在中断或者修改 `worker` 状态的时候使用。 内部引入`AQS`,是为了线程安全,线程执行任务的时候,调用的是`runWorker(Worker w)`,这个方法不是worker的方法,而是 `ThreadPoolExecutor`的方法。从下面的代码可以看出,每次修改`Worke`r的状态的时候,都是线程安全的。`Worker`里面,持有了一个线程`Thread`,可以理解为是对线程的封装。 至于`runWorker(Worker w)`是怎么运行的?先保持这个疑问,后面详细讲解。 ```java // 实现 Runnable,封装了线程 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 序列化id private static final long serialVersionUID = 6138294804551838833L; // worker运行的线程 final Thread thread; // 初始化任务,有可能是空的,如果任务不为空的时候,其他进来的任务,可以直接运行,不在添加到任务队列 Runnable firstTask; // 线程任务计数器 volatile long completedTasks; // 指定一个任务让工人忙碌起来,这个任务可能是空的 Worker(Runnable firstTask) { // 初始化AQS队列锁的状态 setState(-1); // 禁止中断直到 runWorker this.firstTask = firstTask; // 从线程工厂,取出一个线程初始化 this.thread = getThreadFactory().newThread(this); } // 实际上运行调用的是runWorker public void run() { // 不断循环获取任务进行执行 runWorker(this); } // 0表示没有被锁 // 1表示被锁的状态 protected boolean isHeldExclusively() { return getState() != 0; } // 独占,尝试获取锁,如果成功返回true,失败返回false protected boolean tryAcquire(int unused) { // CAS 乐观锁 if (compareAndSetState(0, 1)) { // 成功,当前线程独占锁 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 独占方式,尝试释放锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 上锁,调用的是AQS的方法 public void lock() { acquire(1); } // 尝试上锁 public boolean tryLock() { return tryAcquire(1); } // 解锁 public void unlock() { release(1); } // 是否锁住 public boolean isLocked() { return isHeldExclusively(); } // 如果开始可就中断 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } ``` #### 任务队列 除了放线程池的地方,要是任务很多,没有那么多线程,肯定需要一个地方放任务,充当缓冲作用,也就是任务队列,在代码中表现为: ```java private final BlockingQueue
workQueue; ``` #### 拒绝策略和处理器 计算机的内存总是有限的,我们不可能一直往队列里面增加内容,所以线程池为我们提供了选择,可以选择多种队列。同时当任务实在太多,占满了线程,并且把任务队列也占满的时候,我们需要做出一定的反应,那就是拒绝还是抛出错误,丢掉任务?丢掉哪些任务,这些都是可能需要定制的内容。 ## 如何创建线程池 关于如何创建线程池,其实 `ThreadPoolExecutor`提供了构造方法,主要参数如下,不传的话会使用默认的: - 核心线程数:核心线程数,一般是指常驻的线程,没有任务的时候通常也不会销毁 - 最大线程数:线程池允许创建的最大的线程数量 - 非核心线程的存活时间:指的是没有任务的时候,非核心线程能够存活多久 - 时间的单位:存活时间的单位 - 存放任务的队列:用来存放任务 - 线程工厂 - 拒绝处理器:如果添加任务失败,将由该处理器处理 ```java // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列,线程池工厂 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列,拒绝任务处理器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 最后其实都是调用了这个方法 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... } ``` 其实,除了显示的指定上面的参数之外,JDK也封装了一些直接创建线程池的方法给我们,那就是`Executors`: ```java // 固定线程数量的线程池,***的队列 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
()); } // 单个线程的线程池,***的队列,按照任务提交的顺序,串行执行 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
(), threadFactory)); } // 动态调节,没有核心线程,全部都是普通线程,每个线程存活60s,使用容量为1的阻塞队列 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue
()); } // 定时任务线程池 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } ``` 但是一般是不推荐使用上面别人封装的线程池的哈!!! ## 线程池的底层参数以及核心方法 看完上面的创建参数大家可能会有点懵,但是没关系,一一为大家道来:  可以看出,当有任务进来的时候,先判断核心线程池是不是已经满了,如果还没有,将会继续创建线程。注意,如果一个任务进来,创建线程执行,执行完成,线程空闲下来,这时候再来一个任务,是会继续使用之前的线程,还是重新创建一个线程来执行呢? 答案是重新创建线程,这样线程池可以快速达到核心线程数的规模大小,以便快速响应后面的任务。 如果线程数量已经到达核心线程数,来了任务,线程池的线程又都不是空闲状态,那么就会判断队列是不是满的,倘若队列还有空间,那么就会把任务放进去队列中,等待线程领取执行。 如果任务队列已经满了,放不下任务,那么就会判断线程数是不是已经到最大线程数了,要是还没有到达,就会继续创建线程并执行任务,这个时候创建的是非核心部分线程。 如果已经到达最大线程数,那么就不能继续创建线程了,只能执行拒绝策略,默认的拒绝策略是丢弃任务,我们可以自定义拒绝策略。 值得注意的是,倘若之前任务比较多,创建出了一些非核心线程,那么任务少了之后,领取不到任务,过了一定时间,非核心线程就会销毁,只剩下核心线程池的数量的线程。这个时间就是前面说的`keepAliveTime`。 ### 提交任务 提交任务,我们看`execute()`,会先获取线程池的状态和个数,要是线程个数还没达到核心线程数,会直接添加线程,否则会放到任务队列,如果任务队列放不下,会继续增加线程,但是不是增加核心线程。 ```java public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取状态和个数 int c = ctl.get(); // 如果个数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 直接添加 if (addWorker(command, true)) return; // 添加失败则继续获取 c = ctl.get(); } // 判断线程池状态是不是运行中,任务放到队列中 if (isRunning(c) && workQueue.offer(command)) { // 再次检查 int recheck = ctl.get(); // 判断线程池是不是还在运行 if (! isRunning(recheck) && remove(command)) // 如果不是,那么就拒绝并移除任务 reject(command); else if (workerCountOf(recheck) == 0) // 如果线程数为0,并且还在运行,那么就直接添加 addWorker(null, false); }else if (!addWorker(command, false)) // 添加任务队列失败,拒绝 reject(command); } ``` 上面的源码中,调用了一个重要的方法:`addWorker(Runnable firstTask, boolean core)`,该方法主要是为了增加工作的线程,我们来看看它是如何执行的: ```java private boolean addWorker(Runnable firstTask, boolean core) { // 回到当前位置重试 retry: for (;;) { // 获取状态 int c = ctl.get(); int rs = runStateOf(c); // 大于SHUTDOWN说明线程池已经停止 // ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 表示三个条件至少有一个不满足 // 不等于SHUTDOWN说明是大于shutdown // firstTask != null 任务不是空的 // workQueue.isEmpty() 队列是空的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 工作线程数 int wc = workerCountOf(c); // 是否符合容量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 添加成功,跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // cas失败,重新尝试 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 前面线程计数增加成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建了一个worker,包装了任务 w = new Worker(firstTask); final Thread t = w.thread; // 线程创建成功 if (t != null) { // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次确认状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果线程已经启动,失败 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 新增线程到集合 workers.add(w); // 获取大小 int s = workers.size(); // 判断最大线程池数量 if (s > largestPoolSize) largestPoolSize = s; // 已经添加工作线程 workerAdded = true; } } finally { // 解锁 mainLock.unlock(); } // 如果已经添加 if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { // 如果没有启动 if (! workerStarted) // 失败处理 addWorkerFailed(w); } return workerStarted; } ``` ### 处理任务 前面在介绍`Worker`这个类的时候,我们讲解到其实它的`run()`方法调用的是外部的`runWorker()`方法,那么我们来看看`runWorkder()`方法: 首先,它会直接处理自己的firstTask,这个任务并没有在任务队列里面,而是它自己持有的: ```java final void runWorker(Worker w) { // 当前线程 Thread wt = Thread.currentThread(); // 第一个任务 Runnable task = w.firstTask; // 重置为null w.firstTask = null; // 允许打断 w.unlock(); boolean completedAbruptly = true; try { // 任务不为空,或者获取的任务不为空 while (task != null || (task = getTask()) != null) { // 加锁 w.lock(); //如果线程池停止,确保线程被中断; //如果不是,确保线程没有被中断。这 //在第二种情况下需要复查处理 // shutdown - now竞赛同时清除中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行之前回调方法(可以由我们自己实现) beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行之后回调方法 afterExecute(task, thrown); } } finally { // 置为null task = null; // 更新完成任务 w.completedTasks++; w.unlock(); } } // 完成 completedAbruptly = false; } finally { // 处理线程退出相关工作 processWorkerExit(w, completedAbruptly); } } ``` 上面可以看到如果当前的任务是null,会去获取一个task,我们看看`getTask()`,里面涉及到了两个参数,一个是是不是允许核心线程销毁,另外一个是线程数是不是大于核心线程数,如果满足条件,就从队列中取出任务,如果超时取不到,那就返回空,表示没有取到任务,没有取到任务,就不会执行前面的循环,就会触发线程销毁`processWorkerExit()`等工作。 ```java private Runnable getTask() { // 是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // SHUTDOWN状态继续处理队列中的任务,但是不接收新的任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 线程数 int wc = workerCountOf(c); // 是否允许核心线程超时或者线程数大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 减少线程成功,就返回null,后面由processWorkerExit()处理 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果允许核心线程关闭,或者超过了核心线程,就可以在超时的时间内获取任务,或者直接取出任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 如果能取到任务,那就肯定可以执行 if (r != null) return r; // 否则就获取不到任务,超时了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` ### 销毁线程 前面提到,如果线程当前任务为空,又允许核心线程销毁,或者线程超过了核心线程数,等待了一定时间,超时了却没有从任务队列获取到任务的话,就会跳出循环执行到后面的线程销毁(结束)程序。那销毁线程的时候怎么做呢? ```java private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果是突然结束的线程,那么之前的线程数是没有调整的,这里需要调整 if (completedAbruptly) decrementWorkerCount(); // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 完成的任务数 completedTaskCount += w.completedTasks; // 移除线程 workers.remove(w); } finally { // 解锁 mainLock.unlock(); } // 试图停止 tryTerminate(); // 获取状态 int c = ctl.get(); // 比stop小,至少是shutdown if (runStateLessThan(c, STOP)) { // 如果不是突然完成 if (!completedAbruptly) { // 最小值要么是0,要么是核心线程数,要是允许核心线程超时销毁,那么就是0 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果最小的是0或者队列不是空的,那么保留一个线程 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 只要大于等于最小的线程数,就结束当前线程 if (workerCountOf(c) >= min) return; // replacement not needed } // 否则的话,可能还需要新增工作线程 addWorker(null, false); } } ``` ### 如何停止线程池 停止线程池可以使用`shutdown()`或者`shutdownNow()`,`shutdown()`可以继续处理队列中的任务,而`shutdownNow()`会立即清理任务,并返回未执行的任务。 ```java public void shutdown() { // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查停止权限 checkShutdownAccess(); // 更新状态 advanceRunState(SHUTDOWN); // 中断所有线程 interruptIdleWorkers(); // 回调钩子 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } // 立刻停止 public List
shutdownNow() { List
tasks; // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查停止权限 checkShutdownAccess(); // 更新状态到stop advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 清理队列 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // 返回任务列表(未完成) return tasks; } ``` ## execute()和submit()方法 - `execute() `方法可以提交不需要返回值的任务,无法判断任务是否被线程池执行是否成功 - `submit()`方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个对象,我们调用`get()`方法就可以**阻塞**,直到获取到线程执行完成的结果,同时我们也可以使用有超时时间的等待方法`get(long timeout,TimeUnit unit)`,这样不管线程有没有执行完成,如果到时间,也不会阻塞,直接返回null。返回的是`RunnableFuture`对象,继承了`Runnable, Future
`两个接口: ```java public interface RunnableFuture
extends Runnable, Future
{ /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); } ``` ## 线程池为什么使用阻塞队列? 阻塞队列,首先是一个队列,肯定具有先进先出的属性。 而阻塞,则是这个模型的演化,一般队列,可以用在生产消费者模型,也就是数据共享,有人往里面放任务,有人不断的往里面取出任务,这是一个理想的状态。 但是倘若不理想,产生任务和消费任务的速度不一样,要是任务放在队列里面比较多,消费比较慢,还可以慢慢消费,或者生产者得暂停一下产生任务(阻塞生产者线程)。可以使用 `offer(E o, long timeout, TimeUnit unit)`设定等待的时间,如果在指定的时间内,还不能往队列中加入`BlockingQueue`,则返回失败,也可以使用`put(Object)`,将对象放到阻塞队列里面,如果没有空间,那么这个方法会阻塞到有空间才会放进去。 如果消费速度快,生产者来不及生产,获取任务的时候,可以使用`poll(time)`,有数据则直接取出来,没数据则可以等待`time`时间后,返回`null`。也可以使用`take()`取出第一个任务,没有任务就会一直阻塞到队列有任务为止。 上面说了阻塞队列的属性,那么为啥要用呢? - 如果产生任务,来了就往队列里面放,资源很容易被耗尽。 - 创建线程需要获取锁,这个一个线程池的全局锁,如果各个线程不断的获取锁,解锁,线程上下文切换之类的开销也比较大,不如在队列为空的时候,然一个线程阻塞等待。 ### 常见的阻塞队列  - **ArrayBlockingQueue**:基于数组实现,内部有一个定长的数组,同时保存着队列头和尾部的位置。 - **LinkedBlockingQueue**:基于链表的阻塞对垒,生产者和消费者使用独立的锁,并行能力强,如果不指定容量,默认是无效容量,容易系统内存耗尽。 - **DelayQueue**:延迟队列,没有大小限制,生产数据不会被阻塞,消费数据会,只有指定的延迟时间到了,才能从队列中获取到该元素。 - **PriorityBlockingQueue**:基于优先级的阻塞队列,按照优先级进行消费,内部控制同步的是公平锁。 - **SynchronousQueue**:没有缓冲,生产者直接把任务交给消费者,少了中间的缓存区。 ## 线程池如何复用线程的?执行完成的线程怎么处理 前面的源码分析,其实已经讲解过这个问题了,线程池的线程调用的`run()`方法,其实调用的是`runWorker()`,里面是死循环,除非获取不到任务,如果没有了任务firstTask并且从任务队列中获取不到任务,超时的时候,会再判断是不是可以销毁核心线程,或者超过了核心线程数,满足条件的时候,才会让当前的线程结束。 否则,一直都在一个循环中,不会结束。 我们知道`start()`方法只能调用一次,因此调用到`run()`方法的时候,调用外面的`runWorker()`,让其在`runWorker()`的时候,不断的循环,获取任务。获取到任务,调用任务的`run()`方法。 执行完成的线程会调用`processWorkerExit()`,前面有分析,里面会获取锁,把线程数减少,从工作线程从集合中移除,移除掉之后,会判断线程是不是太少了,如果是,会再加回来,个人以为是一种补救。 ## 如何配置线程池参数? 一般而言,有个公式,如果是计算(CPU)密集型的任务,那么核心线程数设置为`处理器核数-1`,如果是io密集型(很多网络请求),那么就可以设置为`2*处理器核数`。但是这并不是一个银弹,一切要从实际出发,最好就是在测试环境进行压测,实践出真知,并且很多时候一台机器不止一个线程池或者还会有其他的线程,因此参数不可设置得太过饱满。 一般 8 核的机器,设置 10-12 个核心线程就差不多了,这一切必须按照业务具体值进行计算。设置过多的线程数,上下文切换,竞争激烈,设置过少,没有办法充分利用计算机的资源。 > 计算(CPU)密集型消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。 > > io密集型系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。 ## 为什么不推荐默认的线程池创建方式? 阿里的编程规范里面,不建议使用默认的方式来创建线程,是因为这样创建出来的线程很多时候参数都是默认的,可能创建者不太了解,很容易出问题,最好通过`new ThreadPoolExecutor()`来创建,方便控制参数。默认的方式创建的问题如下: - Executors.newFixedThreadPool():***队列,内存可能被打爆 - Executors.newSingleThreadExecutor():单个线程,效率低,串行。 - Executors.newCachedThreadPool():没有核心线程,最大线程数可能为无限大,内存可能还会爆掉。 使用具体的参数创建线程池,开发者必须了解每个参数的作用,不会胡乱设置参数,减少内存溢出等问题。 一般体现在几个问题: - 任务队列怎么设置? - 核心线程多少个? - 最大线程数多少? - 怎么拒绝任务? - 创建线程的时候没有名称,追溯问题不好找。 ## 线程池的拒绝策略 线程池一般有以下四种拒绝策略,其实我们可以从它的内部类看出来:  - AbortPolicy: 不执行新的任务,直接抛出异常,提示线程池已满 - DisCardPolicy:不执行新的任务,但是也不会抛出异常,默默的 - DisCardOldSetPolicy:丢弃消息队列中最老的任务,变成新进来的任务 - CallerRunsPolicy:直接调用当前的execute来执行任务 一般而言,上面的拒绝策略都不会特别理想,一般要是任务满了,首先需要做的就是看任务是不是必要的,如果非必要,非核心,可以考虑拒绝掉,并报错提醒,如果是必须的,必须把它保存起来,不管是使用mq消息,还是其他手段,不能丢任务。在这些过程中,日志是非常必要的。既要保护线程池,也要对业务负责。 ## 线程池监控与动态调整 线程池提供了一些API,可以动态获取线程池的状态,并且还可以设置线程池的参数,以及状态: 查看线程池的状态:  修改线程池的状态:  关于这一点,美团的线程池文章讲得很清楚,甚至做了一个实时调整线程池参数的平台,可以进行跟踪监控,线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等。这里我就不展开了,原文:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html ,这是我们可以参考的思路。 ## 线程池隔离 线程隔离,很多同学可能知道,就是不同的任务放在不同的线程里面运行,而线程池隔离,一般是按照业务类型来隔离,比如订单的处理线程放在一个线程池,会员相关的处理放在一个线程池。 也可以通过核心和非核心来隔离,核心处理流程放在一起,非核心放在一起,两个使用不一样的参数,不一样的拒绝策略,尽量保证多个线程池之间不影响,并且最大可能保住核心线程的运行,非核心线程可以忍受失败。 `Hystrix`里面运用到这个技术,`Hystrix`的线程隔离技术,来防止不同的网络请求之间的雪崩,即使依赖的一个服务的线程池满了,也不会影响到应用程序的其他部分。 ### 关于作者 秦怀,公众号【**秦怀杂货店**】作者,技术之路不在一时,山高水长,纵使缓慢,驰而不息。个人写作方向:Java源码解析,JDBC,Mybatis,Spring,redis,分布式,剑指Offer,LeetCode等,认真写好每一篇文章,不喜欢标题党,不喜欢花里胡哨,大多写系列文章,不能保证我写的都完全正确,但是我保证所写的均经过实践或者查找资料。遗漏或者错误之处,还望指正。 [2020年我写了什么?](http://aphysia.cn/archives/2020) [开源编程笔记](https://damaer.github.io/Coding/#/)
微信关注我们
原文链接:https://blog.51cto.com/u_13604316/2933602
转载内容版权归作者及来源网站所有!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
相关文章
发表评论
资源下载
更多资源优质分享Android(本站安卓app)
近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。
Oracle Database,又名Oracle RDBMS
Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。
Java Development Kit(Java开发工具)
JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。
Sublime Text 一个代码编辑器
Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。