速读Java线程池
一、前言
线程池是开发中绕不开的一个知识点 。
对于移动开发而言,网络框架、图片加载、AsyncTask、RxJava, 都和线程池有关。
正因为线程池应用如此广泛,所以也成了面试的高频考点。
我们今天就来讲讲线程池的基本原理和周边知识。
先从线程的生命周期开始。
二、线程生命周期
线程是程序执行流的最小单元。
Java线程可分为五个阶段:
- 新建(New): 创建Thread对象,并且未调用start();
- 就绪(Runnable): 调用start()之后, 等待操作系统调度;
- 运行(Running): 获取CPU时间分片,执行 run()方法中的代码;
- 阻塞(Blocked): 线程让出CPU,进入等待(就绪);
- 终止(Terminated): 自然退出或者被终止。
线程的创建和销毁代价较高,当有大量的任务时,可复用线程,以提高执行任务的时间占比。
如上图,不断地 Runnable->Runing->Blocked->Runnable, 就可避免过多的线程创建和销毁。
此外,线程的上下文切换也是开销比较大的,若要使用线程池,需注意设置合理的参数,控制线程并发。
三、ThreadPoolExecutor
JDK提供了一个很好用的线程池的封装:ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心线程大小
maximumPoolSize:线程池最大容量(需大于等于corePoolSize,否则会抛异常)
keepAliveTime:线程执行任务结束之后的存活时间
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂
handler:拒绝策略
线程池中有两个任务容器:
private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
前者用于存储Worker,后者用于缓冲任务(Runnable)。
下面是execute方法的简要代码:
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
}
// 若workQueue已满,offer会返回false
if (isRunning(c) && workQueue.offer(command)) {
// ...
} else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize))
return false;
Worker w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
t.start();
}
一个任务到来,假设此时容器workers中Worker数的数量为c,则
- 1、当c < corePoolSize时,创建Worker来执行这个任务,并放入workers;
-
当c >= corePoolSize时,
- 2、若workQueue未满,则将任务放入workQueue
-
若workQueue已满,
- 3、若c < maximumPoolSize,创建Worker来执行这个任务,并放入workers;
- 4、若c >= maximumPoolSize, 执行拒绝策略。
很多人在讲线程池的时候,干脆把workers说成“线程池”,将Worker和线程混为一谈;
不过这也无妨,能帮助理解就好,就像看到一杯水,说“这是水”一样,很少人会说这是“杯子装着水”。
Worker和线程,好比汽车和引擎:汽车装着引擎,汽车行驶,其实是引擎在做功。
Worker本身实现了Runnable,然后有一个Thread和Runnable的成员;
构造函数中,将自身(this)委托给自己的成员thread;
当thread.start(), Worker的run()函数被回调,从而开启 “执行任务-获取任务”的轮回。
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
task.run();
}
}
private Runnable getTask() {
for (;;) {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
}
}
当线程执行完任务(task.run()结束),会尝试去workQueue取下一个任务,
如果workQueue已经清空,则线程进入阻塞态:workQueue是阻塞队列,如果取不到元素会block当前线程。
此时,allowCoreThreadTimeOut为true, 或者 n > corePoolSize,workQueue等待keepAliveTime的时间,
如果时间到了还没有任务进来, 则退出循环, 线程销毁;
否则,一直等待,直到新的任务到来(或者线程池关闭)。
这就是线程池可以保留corePoolSize个线程存活的原理。
从线程的角度,要么执行任务,要么阻塞等待,或者销毁;
从任务的角度,要么马上被执行,要么进入队列等待被执行,或者被拒绝执行。
上图第2步,任务进入workQueue, 如果队列为空且有空闲的Worker的话,可马上得到执行。
关于workQueue,常用的有两个队列:
- LinkedBlockingQueue(capacity):
传入capacity(大于0), 则LinkedBlockingQueue的容量为capacity;
如果不传,默认为Integer.MAX_VALUE,相当于无限容量(不考虑内存因素),多少元素都装不满。
- SynchronousQueue
除非另一个线程试图获取元素,否则不能添加元素。
四、 ExecutorService
为了方便使用,JDK还封装了一些常用的ExecutorService:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
类型 | 最大并发 | 适用场景 |
---|---|---|
newFixedThreadPool | nThreads | 计算密集型任务 |
newSingleThreadExecutor | 1 | 串行执行的任务 |
newCachedThreadPool | Integer.MAX_VALUE | IO密集型任务 |
newScheduledThreadPool | Integer.MAX_VALUE | 定时任务,周期任务 |
newSingleThreadExecutor 其实是 newFixedThreadPool的特例 (nThreads=1),
写日志等任务,比较适合串行执行,一者不会占用太多资源,二者为保证日志有序与完整,同一时间一个线程写入即可。
众多方法中,newCachedThreadPool() 是比较特别的,
1、corePoolSize = 0,
2、maximumPoolSize = Integer.MAX_VALUE,
3、workQueue 为 SynchronousQueue。
结合上一节的分析:
当一个任务提交过来,由于corePoolSize = 0,任务会尝试放入workQueue;
如果没有线程在尝试从workQueue获取任务,offer()会返回false,然后会创建线程执行任务;
如果有空闲线程在等待任务,任务可以放进workQueue,但是放进去后马上就被等待任务的线程取走执行了。
总的来说,就是有空闲线程则交给空闲线程执行,没有则创建线程执行;
SynchronousQueue类型workQueue并不保存任务,只是一个传递者。
所以,最终效果为:所有任务立即调度,无容量限制,无并发限制。
这样的特点比较适合网络请求任务。
OkHttp的异步请求所用线程池与此类似(除了ThreadFactory ,其他参数一模一样)。
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
五、 任务并发的估算
一台设备上,给定一批任务,要想最快时间完成所有任务,并发量应该如何控制?
并发量太小,CPU利用率不高;
并发量太大,CPU 满负荷,但是花在线程切换的时间增加,用于执行任务的时间反而减少。
一些文章提到如下估算公式:
M:并发数;
C:任务占用CPU的时间;
I:等待IO完成的时间(为简化讨论,且只考虑IO);
N:CPU核心数。
代入特定参数验证这条公式:
1、比方说 I 接近于0,则M≈N,一个线程对应一个CPU,刚好满负荷且较少线程切换;
2、假如 I=C,则M = 2N,两个线程对应一个CPU,每个线程一半时间在等待IO,一半时间在计算,也是刚好。
遗憾的是,对于APP而言这条公式并不适用:
- 任务占用CPU时间和IO时间无法估算
APP上的异步任务通常是碎片化的,而不同的任务性质不一样,有的计算耗时多,有的IO耗时多;
然后同样是IO任务,比方说网络请求,IO时间也是不可估计的(受服务器和网速影响)。
- 可用CPU核心可能会变化
有的设备可能会考虑省电或者热量控制而关闭一些核心;
大家经常吐槽的“一核有难,九核围观”映射的就是这种现象。
虽然该公式不能直接套用来求解最大并发,但仍有一些指导意义:
**IO等待时间较多,则需要高的并发,来达到高的吞吐率;
CPU计算部分较多,则需要降低并发,来提高CPU的利用率。**
换言之,就是:
做计算密集型任务时控制并发小一点;
做IO密集型任务时控制并发大一点。
问题来了,小一点是多小,大一点又是多大呢?
说实话这个只能凭经验了,跟“多吃水果”,“加盐少许”一样,看实际情况而定。
比如RxJava就提供了Schedulers.computation()和Schedulers.io(),
前者默认情况下为最大并发为CPU核心数,后者最大并发为Integer.MAX_VALUE(相当于不限制并发)。
可能是作者也不知道多少才合适,所以干脆就不限制了。
这样其实很危险的,JVM对进程有最大线程数限制,超过则会抛OutOfMemoryError。
六、总结
回顾文章的内容,大概有这些点:
- 介绍了线程的生命周期;
- 从线程池的参数入手,分析这些参数是如何影响线程池的运作;
- 列举常用的ExecutorService,介绍其各自特点和适用场景;
- 对并发估算的一些理解。
文章没有对Java线程池做太过深入的探讨,而是从使用的角度讲述基本原理和周边知识;
第二节有结合关键代码作简要分析,也是点到为止,目的在于加深对线程池相关参数的理解,
以便在平时使用线程池的时候合理斟酌,在阅读涉及线程池的开源代码时也能“知其所以然”。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
马云也谈996,对开发者真的是一福利吗?
最近的一个996话题在互联网业界,可以说是非常的火热。身为互联网人,也在时时关注的,毕竟和我们的生活息息相关。在前几天的阿里内部交流活动上,我们的马总也与其员工讨论了996话题,包括自己创办阿里的成长过程。 有了996,才会有你现在更好的生活。马总的一番话点醒了一批互联网行业的我们。 马总说: 马云提到,能做996是一种巨大的福气,很多公司、很多人想996都没有机会。不知道大家如何认马总的一番话,但我觉得还是同有道理的,因为我也没有机会996~ 当然,很多人也对我们可爱的马总的观点表示有争议: 有网友表示大概有以下几点: 1、996对老板来说,值得奋斗因为是他的事业,对普通员工来说只是职业 2、谈996太理亏,所以我换个“奋斗”为名的制高点 3、只有背叛阶级的个人,没有背叛利益的阶级。 4、996?资本家剥削的套路而已。 5、不是每个人都像你这么有能力,只能靠出卖自己的身体换取财富,996是他们自己的选择 对于996,有很多成功的互联网大佬也非常的支持996,笔者认为,一个观点,总会有赞同他的一面,也会有争议的一面。那是因为自己所处的环境很可能决定了你的想法和观点 比如下面的一个网友的...
-
下一篇
7天教你玩转云服务器
教学课程:如何玩转云服务器云服务器是一种简单高效、安全可靠、处理能力可弹性伸缩的计算服务。其管理方式比物理服务器更简单高效。用户无需提前购买硬件,即可迅速创建或释放任意多台云服务器 。云服务器帮助您快速构建更稳定、安全的应用,降低开发运维的难度和整体IT成本,使您能够更专注于核心业务的创新。云计算服务器(又称云服务器或云主机)主要面向中小企业用户与高端用户提供基于互联网的基础设施服务,这一用户群体庞大,且对互联网主机应用的需求日益增加。该用户群体具备如下特征:业务以主机租用与虚拟专用服务器为主,部分采用托管服务,且规模较大;注重短期投资回报率,对产品的性价比要求较高;个性化需求强,倾向于全价值链、傻瓜型产品 。 用户在采用传统的服务器时,由于成本、运营商选择等诸多因素,不得不面对各种棘手的问题,而弹性的云计算服务器的推出,则有效的解决
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- MySQL数据库在高并发下的优化方案
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题