java源码-ThreadPoolExecutor(1)
开篇
ThreadPoolExecutor还是很值得仔细看一看的,所以准备分开两篇文章来说明,这篇文章用来说明ThreadPoolExecutor的构造函数、拒绝策略、以及一些额外的线程池工厂的一些细节。下一篇文章会着重说明ThreadPoolExecutor的执行。
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)
ThreadPoolExecutor类结构
从ThreadPoolExecutor的类继承关系来看:
- 接口Executor和ExecutorService都是接口定义。
- 接口Executor只有一个execute()方法。
- 接口ExecutorService增加了一些状态判断和任务提交接口。
- 类AbstractExecutorService实现了定义中的部分接口包括submit方法且新增doInvokeAny相关方法。
- 类ThreadPoolExecutor继承AbstractExecutorService实现了其他在Executor和ExecutorService中定义的接口
public interface Executor { void execute(Runnable command); }
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { // 省略相关代码 } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { // 省略相关代码 } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { // 省略相关代码 } }
public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } }
ThreadPoolExecutor的类定义
ThreadPoolExecutor的构造函数
ThreadPoolExecutor的核心构造函数其实只有一个,见下面的代码注释。然后需要解释的是构造函数的几个参数:
corePoolSize
核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。maximumPoolSize
线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。keepAliveTime
非核心线程的闲置超时时间,超过这个时间就会被回收。unit
指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。workQueue
线程池中的任务队列.
常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。threadFactory
线程工厂,提供创建新线程的功能。RejectedExecutionHandler
当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。
public class ThreadPoolExecutor extends AbstractExecutorService { private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { // 使用了Executors.defaultThreadFactory()创建了线程池工厂 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 核心构造函数,核心构造函数,核心构造函数,重要事情说三遍!!! public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
在ThreadPoolExecutor源码中额外的收获就是线程池工厂的实现方法了,其实我们有时候在使用Executors的多线程的时候一定要传入线程池工厂的实现方式,这样当我们通过jstack等方法定位问题的时候方便鉴别出线程归属。
Executors的DefaultThreadFactory方法内部通过静态变量tatic final AtomicInteger poolNumber来保证DefaultThreadFactory的实例能够有唯一的id标识,至于线程池工厂创建的线程会自带线程池的前缀以及自增id。
这也算额外的收获吧。
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } }
RejectedExecutionHandler是一个接口,里面只有一个方法。当要创建的线程数量大于线程池的最大线程数的时候,就会调用这个接口里的这个方法实现任务的拒绝。可以自定义这个接口,实现对这些超出数量的任务的处理。
ThreadPoolExecutor自己已经提供了四个拒绝策略,分别是
- CallerRunsPolicy:CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务
- AbortPolicy:ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy,直接抛出异常。
- DiscardPolicy:让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。
- DiscardOldestPolicy:抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
参考文章
Java多线程-线程池ThreadPoolExecutor构造方法和规则
java多线程-ThreadPoolExecutor的拒绝策略RejectedExecutionHandler
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Python进阶-算法-递归
版权声明:如需转载,请注明转载地址。 https://blog.csdn.net/oJohnny123/article/details/81911889 1、递归就是自己调自己 2、在使用递归策略时,必须有一个递归出口,也就是得有一个明确的递归结束条件。 3、递归算法效率并不是很高,而且容易栈溢出。 4、递归算法写的程序都会很简洁。 代码: def fun1(x): if x > 0 : print(x) fun1(x - 1) def fun2(x): if x > 0 : fun2(x - 1) print(x) fun1(5) print('='*100) fun2(5) print('='*100) 执行结果: /Users/liaoyangyang/crc/codes-python/LearnPython/venv/bin/python /Users/liaoyangyang/crc/codes-python/LearnPython/test.py 5 4 3 2 1 ===============================================...
- 下一篇
vue移动端预览pdf功能(可以多页)以及第三方电子签章不能正常展示解决方案(多种)
随着网络的发展,PC端的网站已不能满足人们的需求,人们更喜欢采用移动端进行业务操作。最近公司要求把PC端网站的订单合同签署功能移植到微信端,而不再局限于PC端操作。 对于这样的要求,我们需要了解的是订单合同,协议书之类的一般都属于不可以任意修改的文件(PDF),这样的文件,现在的浏览器基本都支持直接访问的。但是遗憾的是,移动端并不支持直接访问,这样我们需要对PDF文件进行解析处理。首先我们考虑到通过服务器访问到PDF文件,传递到前端,再由前端进行解析处理。 这里前端框架采用vue,vue中有整合到第三方pdf解析库pdfjs-dist 1、安装 pdfjs-dist npm install pdfjs-dist --save 2、创建pdf组件 pdf-component.vue <template> <div> <canvas v-for="page in pages" :id="'the-canvas'+page" :key="page"></canvas> </div> </template> <scr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装