深入详解Java线程池——ThreadPoolExecutor
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
1.线程池实现原理
1.1线程池处理流程
首先,我们需要了解当提交一个任务给线程池之后,线程池的处理流程:
- 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
- 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
- 线程池判断线程池的线程数量是否为最大线程数量且每个线程都处于工作状态。如果不是,则创建一个新的工作线程来执行任务。如果是,则交给饱和策略来处理这个任务。
以ThreadPoolExecutor执行execute()方法的设计思路为例:
依据图片ThreadPoolExecutor执行execute()方法有四种情况:
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤 需要获取全局锁)。
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执 行这一步骤需要获取全局锁)。
- 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
源码分析:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
2. 线程池的使用
2.1 创建线程池
Java中线程池用ThreadPoolExecutor来创建。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
其中的参数为:
- int corePoolSize:必需参数,规定了线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线 程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。
- int maximumPoolSize:必需参数,线程池中允许创建线程的最大数量,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如 果使用了无界的任务队列这个参数就没什么效果。
- long keepAliveTime:必需参数,线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
- TimeUnit unit:必需参数,线程活动保持时间的单位,可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒 (NANOSECONDS,千分之一微秒)。
- BlockingQueue<Runnable> workQueue:必需参数,任务队列,用于保存等待执行的任务的阻塞队列,关于阻塞队列可以参考https://my.oschina.net/u/3352298/blog/1807780
- ThreadFactory threadFactory:非必须参数,不设置此参数会采用内置默认参数。用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
- RejectedExecutionHandler handler:非必须参数,不设置此参数会采用内置默认参数,设置饱和策略,当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。RejectedExecutionHandler 的实现类在ThreadPoolExecutor中有四个静态内部类,这个策略默认情况下是AbortPolicy,表示无法 处理新任务时抛出异常。共有4中策略,包括AbortPolicy(直接抛出异常)。CallerRunsPolicy(只用调用者所在线程来运行任务)。DiscardOldestPolicy(丢弃队列里最近的一个任务,并执行当前任务)。DiscardPolicy(不处理,丢弃掉)。也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务。
public class TestThreadPool { public static void main(String[] args) { int corePoolSize=10; int maximumPoolSize=20; long keepAliveTime=1; TimeUnit unit=TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue=new LinkedBlockingQueue<Runnable>(); Factory f=new Factory(); RejectedExecutionHandler handler1=new ThreadPoolExecutor.CallerRunsPolicy(); ThreadPoolExecutor pool1 = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); ThreadPoolExecutor pool2 = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,f); } } class Factory implements ThreadFactory{ private int i; @Override public Thread newThread(Runnable r) { Thread t=new Thread(r, "线程x"+(i+1)); return t; } } class Handler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //处理该任务 } }
2.2 向线程池提交任务
有两种方法:execute()和submit()。
1.execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
2.submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个 future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方 法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线 程一段时间后立即返回,这时候有可能任务没有执行完。
public class TestThreadPool { public static void main(String[] args) { int corePoolSize=10; int maximumPoolSize=20; long keepAliveTime=1; TimeUnit unit=TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue=new LinkedBlockingQueue<Runnable>(); Factory f=new Factory(); RejectedExecutionHandler handler1=new ThreadPoolExecutor.CallerRunsPolicy(); ThreadPoolExecutor pool1 = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); ThreadPoolExecutor pool2 = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,f); pool1.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); pool2.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } } class Factory implements ThreadFactory{ private int i; @Override public Thread newThread(Runnable r) { Thread t=new Thread(r, "线程x"+(i+1)); return t; } } class Handler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //处理该任务 } }
2.3 关闭线程池
有两种方法:shutdown或shutdownNow方法,它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。且调用之后不能再向线程池提交任务。
1.shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。会立即停止所有任务,无论是否执行完毕。
2.shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。直到所有任务执行完成或者退出才能关闭线程池。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务 都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪 一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
public class TestThreadPool { public static void main(String[] args) { int corePoolSize=10; int maximumPoolSize=20; long keepAliveTime=1; TimeUnit unit=TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue=new LinkedBlockingQueue<Runnable>(); Factory f=new Factory(); RejectedExecutionHandler handler1=new ThreadPoolExecutor.CallerRunsPolicy(); ThreadPoolExecutor pool1 = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); ThreadPoolExecutor pool2 = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,f); pool1.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); pool2.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); pool1.shutdown(); pool2.shutdown(); } } class Factory implements ThreadFactory{ private int i; @Override public Thread newThread(Runnable r) { Thread t=new Thread(r, "线程x"+(i+1)); return t; } } class Handler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //处理该任务 } }
2.4 合理配置线程池
使用线程池时,我们应合理的配置构造器中的几个参数,以更好地实现目的或取得更高的效率,可以从以下几个角度分析如何配置:
- 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的 线程,如配置N(cpu)+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*N(cpu)。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
- 任务的优先级:高、中和低。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
- 任务的执行时间:长、中和短。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让 执行时间短的任务先执行。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越 长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点 儿,比如几千。如果设置成无界队列,那么由于操作数据库的线程可能会阻塞,引起线程池的队列就会越来越多, 有可能会撑满内存,导致整个系统不可用。
2.5 线程池监控
使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。
可以通过线程池提供的参数进行监控。比如:
- taskCount:线程池需要执行的任务数量。
- completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
- largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
- getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。
- getActiveCount:获取活动的线程数。
还可以通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。 这几个方法在线程池里是空方法。
总结:在前面我们有了解过几个内置的线程池类,如newFixedThreadPool等,但是通常我们应该通过ThreadPoolExecutor类来写我们自己的线程池类,因为那些内置的线程池内总会不满足我们的各种需求且可能会引起性能问题,所以我们应认真分析任务特点后进行专门的配置线程池。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
并发容器与框架——Fork/Join框架
1. Fork/Join框架概念 Fork/Join框架是Java提供的一个用于并行执行任务的框架,它会将一个大任务分成多个小任务,并且将每个小任务的最终结果汇总得到大任务结果的框架。比如对1+2+3+····+100求和,可以分成十个子任务分别对10个数求和,最后再汇总这十个子任务的结果。 2.工作窃取算法 工作窃取算法是指某个线程从其他任务队列里窃取任务来执行。 假如我们可以将一个总任务分割成多个互不相干的子任务,为了减少线程的竞争,我们会将这些子任务放在不同的队列中,并为每个队列都建造一个线程执行该队列的任务,线程和队列一一对应。但是有些线程可能会很早的执行完自己队列中的所有任务,而其他线程还会处理自己拥有的队列中的任务,此时已处理完任务的线程与其等待其他线程执行任务,不如帮助其他线程一起执行剩余任务。这时他们会从其他线程的队列里窃取一个线程执行任务,所以为了避免因为工作窃取引起的两个线程之间的竞争,通常任务队列会使用双端队列。任务队列线程从头部取任务,窃取线程从尾部取任务。 优点是充分利用线程进行并行运算,并减少了竞争,缺点就是还是存在竞争情况,比如队列中任务数为1时,...
- 下一篇
Java 多线程 之 stop停止线程实例
http://www.verejava.com/?id=16992927898270 package com.stop; /** 题目: 人们在火车站的售票窗口排队买火车票 1. 北京西站开门 2. 打开售票窗口 3. 北京西站有10张去长沙的票 4. 打开2个售票窗口, 5 假设每个售票窗口每隔1秒钟买完一张票 1. 根据 名词 找类 人们(Person), 火车站(Station),火车票(Ticket) , 售票窗口e 是火车站的线程 */ import java.util.*; public class TestTrain { public static void main(String[] args) { //火车西站开门 Station station = new Station("火车西站"); //开窗口 //Thread win1=station.openWindow("普通窗口"); Thread win2 = station.openWindow("学生窗口"); //窗口1 开始售票 //设置线程优先级 setPriority() //win2.setPrior...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS8安装Docker,最新的服务器搭配容器使用
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Docker安装Oracle12C,快速搭建Oracle学习环境
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2全家桶,快速入门学习开发网站教程