深入浅出线程池 | 京东云技术团队
一、线程
1、什么是线程
线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际 运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线 程并行执行不同的任务。
2、如何创建线程
2.1、JAVA中创建线程
/** * 继承Thread类,重写run方法 */ class MyThread extends Thread { @Override public void run() { System.out.println("myThread..." + Thread.currentThread().getName()); } } /** * 实现Runnable接口,实现run方法 */ class MyRunnable implements Runnable { @Override public void run() { System.out.println("MyRunnable..." + Thread.currentThread().getName()); } } /** * 实现Callable接口,指定返回类型,实现call方法 */ class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "MyCallable..." + Thread.currentThread().getName(); } }
2.2、测试一下
public static void main(String[] args) throws Exception { MyThread thread = new MyThread(); thread.run(); //myThread...main thread.start(); //myThread...Thread-0 MyRunnable myRunnable = new MyRunnable(); Thread thread1 = new Thread(myRunnable); myRunnable.run(); //MyRunnable...main thread1.start(); //MyRunnable...Thread-1 MyCallable myCallable = new MyCallable(); FutureTask<String> futureTask = new FutureTask<>(myCallable); Thread thread2 = new Thread(futureTask); thread2.start(); System.out.println(myCallable.call()); //MyCallable...main System.out.println(futureTask.get()); //MyCallable...Thread-2 }
2.3、问题
既然我们创建了线程,那为何我们直接调用方法和我们调用start()方法的结果不同?new Thread() 是否真实创建了线程?
2.4、问题分析
我们直接调用方法,可以看到是执行的主线程,而调用start()方法就是开启了新线程,那说明new Thread()并没有创建线程,而是在start()中创建了线程。
那我们看下Thread类start()方法:
class Thread implements Runnable { //Thread类实现了Runnalbe接口,实现了run()方法 private Runnable target; public synchronized void start() { ... boolean started = false; try { start0(); //可以看到,start()方法真实的调用时start0()方法 started = true; } finally { ... } } private native void start0(); //start0()是一个native方法,由JVM调用底层操作系统,开启一个线程,由操作系统过统一调度 @Override public void run() { if (target != null) { target.run(); //操作系统在执行新开启的线程时,回调Runnable接口的run()方法,执行我们预设的线程任务 } } }
2.5、总结
二、多线程
1、什么是多线程
多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。同一个线程只 能处理完一个任务在处理下一个任务,有时我们需要多个任务同时处理,这时,我们就需要创建多 个线程来同时处理任务。
2、多线程有什么好处
2.1、串行处理
public static void main(String[] args) throws Exception { System.out.println("start..."); long start = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { Thread.sleep(2000); //每个任务执行2秒 System.out.println("task done..."); //处理执行结果 } long end = System.currentTimeMillis(); System.out.println("end...,time = " + (end - start)); } //执行结果 start... task done... task done... task done... task done... task done... end...,time = 10043
2.2、并行处理
public static void main(String[] args) throws Exception { System.out.println("start..."); long start = System.currentTimeMillis(); List<Future> list = new ArrayList<>(); for (int i = 0; i < 5; i++) { Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); //每个任务执行2秒 return "task done..."; } }; FutureTask task = new FutureTask(callable); list.add(task); new Thread(task).start(); } list.forEach(future -> { try { System.out.println(future.get()); //处理执行结果 } catch (Exception e) { } }); long end = System.currentTimeMillis(); System.out.println("end...,time = " + (end - start)); } //执行结果 start... task done... task done... task done... task done... task done... end...,time = 2005
2.3、总结
2.4、多线程的问题
上面示例中我们可以看到,如果每来一个任务,我们就创建一个线程,有很多任务的情况下,我们 会创建大量的线程,可能会导致系统资源的耗尽。同时,我们知道线程的执行是需要抢占CPU资源 的,那如果有太多的线程,就会导致大量时间用在线程切换的开销上。
再有,每来一个任务都需要创建一个线程,而创建一个线程需要调用操作系统底层方法,开销较 大,而线程执行完成后就被回收了。在需要大量线程的时候,创建线程的时间就花费不少了。
三、线程池
1、如何设计一个线程池
由于多线程的开发存在上述的一些问题,那我们是否可以设计一个东西来避免这些问题呢?当然可以! 线程池就是为了解决这些问题而生的。那我们该如何设计一个线程池来解决这些问题呢?或者说,一个线程池该具备什么样的功能?
1.1、线程池基本功能
1.2、线程池面临问题
1.3、创新源于生活
先假设一个场景:假设我们是一个物流公司的管理人员,要配送的货物就是我们的任务,货车就是 我们配送工具,我们当然不能有多少货物就准备多少货车。那当顾客源源不断的将货物交给我们配 送,我们该如何管理才能让公司经营的最好呢?
1.4、技术源于创新
基于上述场景,物流公司就是我们的线程池、货物就是我们的线程任务、货车就是我们的线程。我 们如何设计公司的管理货车的流程,就应该如何设计线程池管理线程的流程。
2、线程池具体分析
上文中,我们讲了该如何设计一个线程池,下面我们看看大神是如何设计的;
2.1、 JAVA中的线程池是如何设计的
2.1.1、 线程池设计
看下线程池中的属性,了解线程池的设计。
public class ThreadPoolExecutor extends AbstractExecutorService { //线程池的打包控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //值为29,用来表示偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; //线程池的最大容量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程池的运行状态,总共有5个状态,用高3位来表示 private static final int RUNNING = -1 << COUNT_BITS; //接受新任务并处理阻塞队列中的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; //不接受新任务但会处理阻塞队列中的任务 private static final int STOP = 1 << COUNT_BITS; //不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务 private static final int TIDYING = 2 << COUNT_BITS; //所有任务都已终止, 工作线程数量为0,即将要执行terminated()钩子方法 private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法已经执行结束 //任务缓存队列,用来存放等待执行的任务 private final BlockingQueue<Runnable> workQueue; //全局锁,对线程池状态等属性修改时需要使用这个锁 private final ReentrantLock mainLock = new ReentrantLock(); //线程池中工作线程的集合,访问和修改需要持有全局锁 private final HashSet<Worker> workers = new HashSet<Worker>(); // 终止条件 private final Condition termination = mainLock.newCondition(); //线程池中曾经出现过的最大线程数 private int largestPoolSize; //已完成任务的数量 private long completedTaskCount; //线程工厂 private volatile ThreadFactory threadFactory; //任务拒绝策略 private volatile RejectedExecutionHandler handler; //线程存活时间 private volatile long keepAliveTime; //是否允许核心线程超时 private volatile boolean allowCoreThreadTimeOut; //核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0 private volatile int corePoolSize; //最大池大小,不得超过CAPACITY private volatile int maximumPoolSize; //默认的任务拒绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //运行权限相关 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); ... }
小结一下:以上线程池的设计可以看出,线程池的功能还是很完善的。
2.1.2、线程池构造函数
//构造函数 public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大允许线程数 long keepAliveTime, //线程存活时间 TimeUnit unit, //存活时间单位 BlockingQueue<Runnable> workQueue, //任务缓存队列 ThreadFactory threadFactory, //线程工厂 RejectedExecutionHandler handler) { //拒绝策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
小结一下:
2.1.3、线程池执行
2.1.3.1、提交任务方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
可以看到submit方法的底层调用的也是execute方法,所以我们这里只分析execute方法;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //第一步:创建核心线程 if (workerCountOf(c) < corePoolSize) { //worker数量小于corePoolSize if (addWorker(command, true)) //创建worker return; c = ctl.get(); } //第二步:加入缓存队列 if (isRunning(c) && workQueue.offer(command)) { //线程池处于RUNNING状态,将任务加入workQueue任务缓存队列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //双重检查,若线程池状态关闭了,移除任务 reject(command); else if (workerCountOf(recheck) == 0) //线程池状态正常,但是没有线程了,创建worker addWorker(null, false); } //第三步:创建临时线程 else if (!addWorker(command, false)) reject(command); }
小结一下:execute()方法主要功能:
2.1.3.2、创建线程
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //等价于:rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) //线程池已关闭,并且无需执行缓存队列中的任务,则不创建 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //CAS增加线程数 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //上面的流程走完,就可以真实开始创建线程了 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //这里创建了线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //这里将线程加入到线程池中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //添加成功,启动线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); //添加线程失败操作 } return workerStarted; }
小结:addWorker()方法主要功能;
2.1.3.3、工作线程的实现
private final class Worker //Worker类是ThreadPoolExecutor的内部类 extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; //持有实际线程 Runnable firstTask; //worker所对应的第一个任务,可能为空 volatile long completedTasks; //记录执行任务数 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); //当前线程调用ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用 } ...继承AQS,实现了不可重入锁... }
小结:工作线程Worker类主要功能;
2.1.3.4、线程的复用
final void runWorker(Worker w) { //ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; //标识线程是否异常终止 try { while (task != null || (task = getTask()) != null) { //这里会不断从任务队列获取任务并执行 w.lock(); //线程是否需要中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); //执行任务前的Hook方法,可自定义 Throwable thrown = null; try { task.run(); //执行实际的任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //执行任务后的Hook方法,可自定义 } } finally { task = null; //执行完成后,将当前线程中的任务制空,准备执行下一个任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //线程执行完成后的清理工作 } }
小结:runWorker()方法主要功能;
2.1.3.5、队列中获取待执行任务
private Runnable getTask() { boolean timedOut = false; //标识当前线程是否超时未能获取到task对象 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) //若线程存活时间超时,则CAS减去线程数量 return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //允许超时回收则阻塞等待 workQueue.take(); //不允许则直接获取,没有就返回null if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
小结:getTask()方法主要功能;
2.1.3.6、清理工作
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); //移除执行完成的线程 } finally { mainLock.unlock(); } tryTerminate(); //每次回收完一个线程后都尝试终止线程池 int c = ctl.get(); if (runStateLessThan(c, STOP)) { //到这里说明线程池没有终止 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); //异常终止线程的话,需要在常见一个线程 } }
小结:processWorkerExit()方法主要功能;
2.1.3.7、尝试终止线程池
final void tryTerminate() { for (;;) { int c = ctl.get(); //若线程池正在执行、线程池已终止、线程池还需要执行缓存队列中的任务时,返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //执行到这里,线程池为SHUTDOWN且无待执行任务 或 STOP 状态 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); //只中断一个线程 return; } //执行到这里,线程池已经没有可用线程了,可以终止了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS设置线程池终止 try { terminated(); //执行钩子方法 } finally { ctl.set(ctlOf(TERMINATED, 0)); //这里将线程池设为终态 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
小结:tryTerminate()方法主要功能;
2.2、JAVA线程池总结
以上通过对JAVA线程池的具体分析我们可以看出,虽然流程看似复杂,但其实有很多内容都是状态重复校验、线程安全的保证等内容,其主要的功能与我们前面所提出的设计功能一致,只是额外增加了一些扩展,下面我们简单整理下线程池的功能;
2.2.1、主要功能
2.2.2、扩展功能
2.2.3、流程总结
以上通过对JAVA线程池任务提交流程的分析我们可以看出,线程池执行的简单流程如下图所示;
2.3、JAVA线程池使用
线程池基本使用验证上述流程:
public static void main(String[] args) throws Exception { //创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(5)); //加入4个任务,小于核心线程,应该只有4个核心线程,队列为0 for (int i = 0; i < 4; i++) { threadPoolExecutor.submit(new MyRunnable()); } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 4 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0 //再加4个任务,超过核心线程,但是没有超过核心线程 + 缓存队列容量,应该5个核心线程,队列为3 for (int i = 0; i < 4; i++) { threadPoolExecutor.submit(new MyRunnable()); } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 5 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 3 //再加4个任务,队列满了,应该5个热核心线程,队列5个,非核心线程2个 for (int i = 0; i < 4; i++) { threadPoolExecutor.submit(new MyRunnable()); } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 7 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5 //再加4个任务,核心线程满了,应该5个热核心线程,队列5个,非核心线程5个,最后一个拒绝 for (int i = 0; i < 4; i++) { try { threadPoolExecutor.submit(new MyRunnable()); } catch (Exception e) { e.printStackTrace(); //java.util.concurrent.RejectedExecutionException } } System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 10 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 5 System.out.println(threadPoolExecutor.getTaskCount()); //共执行15个任务 //执行完成,休眠15秒,非核心线程释放,应该5个核心线程,队列为0 Thread.sleep(1500); System.out.println("worker count = " + threadPoolExecutor.getPoolSize()); //worker count = 5 System.out.println("queue size = " + threadPoolExecutor.getQueue().size()); //queue size = 0 //关闭线程池 threadPoolExecutor.shutdown(); }
作者:京东零售 秦浩然
来源:京东云开发者社区 转载请注明来源

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
使用 Databend 加速 Hive 查询
作者:尚卓燃(PsiACE) 澳门科技大学在读硕士,Databend 研发工程师实习生 Apache OpenDAL(Incubating) Committer https://github.com/PsiACE 随着架构的不断迭代和更新,大数据系统的查询目标也从大吞吐量查询逐步转移转向快速的交互式查询,对于查询的及时响应提出了更高要求。许多企业的数仓/数据湖中都有 PB 级的数据,其中绝大多数都属于旧有系统中的历史数据,很少更新,移动起来很麻烦,重新组织元数据也需要花费大量的时间。需要解决的问题是:如何在保证现有的数据和元数据不变的情况下加速查询。 上图是一个典型的使用 Databend 加速 Hive 查询的架构。用户使用 trino 、Spark 等引擎将数据纳入 Hive 进行管理,数据的存放位置则位于 S3 、GCS 、HDFS 等存储服务之中。引入 Databend 可以带来更好的查询性能。 和 trino 以及大多数支持 Hive Catalog / Connector 的查询引擎一样,Databend 可以复用 Hive 除了运行时(查询引擎)之外的其他组件,包括用于管...
- 下一篇
OpenHarmony应用开发—ArkUI组件集合
介绍 本示例为ArkUI中组件、通用、动画、全局方法的集合。 效果预览 绑定 通用 动画 全局方法 <center>Button 按钮</center> <center>onClick 点击事件</center> <center>元素共享转场</center> <center>警告弹窗</center> 使用说明: 1.点击组件、通用、动画、全局方法四个按钮或左右滑动切换不同视图。 2.点击二级导航(如通用属性、通用事件等),若存在三级导航则展开三级导航(如Border 边框、点击事件等);若不存在三级导航,则跳转至详情页面。 3.若存在三级导航(如Border 边框、点击事件等),点击跳转至详情页面。 工程目录 entry/src/main/ets/ |---component | |---AttributeModificationTool.ets // 组件 | |---IntroductionTitle.ets // 首页介绍标题组件 | |---TabContentNavigatio...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程