您现在的位置是:首页 > 文章详情

理解Java线程池ThreadPoolExecutor

日期:2018-05-24点击:270

在Java的线程池的使用会有比较多的地方,有比较多的应用场景,介绍一下Java线程池ThreadPoolExecutor。

线程是一个操作系统概念。操作系统负责这个线程的创建、挂起、运行、阻塞和终结操作。而操作系统创建线程、切换线程状态、终结线程都要进行CPU调度----这是一个耗费时间和系统资源的事情。

大多数实际场景中是这样的:处理某一次请求的时间是非常短暂的,但是请求数量是巨大的。这种背景下,如果我们为每一个请求都单独创建一个线程,那么物理机的所有资源基本上都被操作系统创建线程、切换线程状态、销毁线程这些操作所占用,用于业务请求处理的资源反而减少了。所以最理想的处理方式是,将处理请求的线程数量控制在一个范围,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。

1.ThreadPoolExecutor类

二话不多说,来看一下ThreadPoolExecutor类的具体实现源码。

继承抽象类AbstractExecutorService,它实现了ExecutorService 接口

public class ThreadPoolExecutor extends AbstractExecutorService{····}
public abstract class AbstractExecutorService implements ExecutorService {····}
public interface ExecutorService extends Executor {····}
public interface Executor {····}

ThreadPoolExecutor类中提供了四个构造方法:

 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

构造函数中需要传入的参数包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory和handler。

corePoolSize:线程池主要用于执行任务的是核心线程数量。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。


非核心线程:设置的大于corePoolSize参数小于maximumPoolSize参数的部分,就是线程池可以临时创建的“非核心线程”的最大数量。这种情况下如果某个线程没有运行任何任务,在等待keepAliveTime时间后,这个线程将会被销毁,直到线程池的线程数量重新达到corePoolSize。


maximumPoolSize:当前线程池允许创建的最大线程数量。那么如果设置的corePoolSize参数和设置的maximumPoolSize参数一致时,线程池在任何情况下都不会回收空闲线程。keepAliveTime和timeUnit也就失去了意义。


keepAliveTime:线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。


timeUnit:参数keepAliveTime的时间单位。


workQueue:一个阻塞队列,用来存储等待执行的任务。一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。


threadFactory:线程工厂,主要用来创建线程。


handler:表示当拒绝处理任务时的策略。有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

2.线程池实现原理

(1)任务的执行

先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务 private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小,runState等)的改变都要使用这个锁 private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集 private volatile long keepAliveTime; //线程存活时间 private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间 private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; //线程池最大能容忍的线程数 private volatile int poolSize; //线程池中当前的线程数 private volatile RejectedExecutionHandler handler; //任务拒绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程 private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数 private long completedTaskCount; //用来记录已经执行完毕的任务个数

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

·····························································································

<未完待续>



原文链接:https://yq.aliyun.com/articles/596900
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章