您现在的位置是:首页 > 文章详情

线程池

日期:2020-05-12点击:480

线程池的基本使用

package com.shothook.test; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.time.Duration; import java.time.LocalDateTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j public class ThreadPoolDemo { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor(10, 1000, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10)); LocalDateTime start = LocalDateTime.now(); for (int i = 0; i < 1000; i++) { threadPool.submit(new Task(i)); } LocalDateTime end = LocalDateTime.now(); Duration duration = Duration.between(start,end); log.debug("submit task complete, takes time: {}", duration.getNano()); } @AllArgsConstructor @Slf4j public static class Task implements Runnable { private int i; @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("thread name: {}, to execute task: {}", Thread.currentThread().getName(), i); } } }

线程池的工作模式,简单来说如下所示:

image

主线程往缓冲队列中添加任务,线程池从队列中取任务并执行,当然,真实情况比这复杂的多,这个后面再详细分析,先有个概念就好。

创建线程池的各个参数的意义

线程池的构造函数:image

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

corePoolSize:核心线程数
maximumPoolSize:最大线程数,当往缓冲队列中增加任务,缓冲队列满时,会在corePoolSize的基础上继续新建线程,知道触发最大线程数
keepAliveTime:超过核心线程数的线程的存活时间,对超过存活时间的,线程池结束超时的空闲线程
unit:超过核心线程数的线程的存活时间对应的时间单位
workQueue:线程池的任务缓存队列
threadFactory:创建线程的工厂
handler:当到最大线程数,并且缓存队列满时,继续添加任务时的处理策略

线程池的状态

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; }//获取线程池的状态 private static int workerCountOf(int c) { return c & CAPACITY; }//获取线程池的线程数 private static int ctlOf(int rs, int wc) { return rs | wc; }//参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl

ctl : 高三位保存线程池状态,低29位保存任务数。

线程池的创建

/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行 * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理 * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution RejectedExecutionException是一个RuntimeException * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务 * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了 * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程 * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展) * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务 */ int c = ctl.get(); /** * 1、如果当前线程数少于corePoolSize(可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了) */ if (workerCountOf(c) < corePoolSize) { //addWorker()成功,返回 if (addWorker(command, true)) return; /** * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get()) * 失败的原因可能是: * 1、线程池已经shutdown,shutdown的线程池不再接收新任务 * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize */ c = ctl.get(); } /** * 2、如果线程池RUNNING状态,且入队列成功 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();//再次校验位 /** * 再次校验放入workerQueue中的任务是否能被执行 * 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务 * 2、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了) */ //如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command if (! isRunning(recheck) && remove(command)) reject(command); //如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null //为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢?? //只保证有一个worker线程可以从queue中获取任务执行就行了?? //因为只要还有活动的worker线程,就可以消费workerQueue中的任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask //第二个参数为true代表占用corePoolSize,false占用maxPoolSize } /** * 3、如果线程池不是running状态 或者 无法入队列 * 尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command */ else if (!addWorker(command, false)) reject(command); }

image

钩子

 protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }

线程池的类图

ScheduledThreadPoolExecutor

原文链接:https://yq.aliyun.com/articles/759971
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章