java源码-ThreadPoolExecutor(1)
开篇
ThreadPoolExecutor还是很值得仔细看一看的,所以准备分开两篇文章来说明,这篇文章用来说明ThreadPoolExecutor的构造函数、拒绝策略、以及一些额外的线程池工厂的一些细节。下一篇文章会着重说明ThreadPoolExecutor的执行。
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)
ThreadPoolExecutor类结构
从ThreadPoolExecutor的类继承关系来看:
- 接口Executor和ExecutorService都是接口定义。
- 接口Executor只有一个execute()方法。
- 接口ExecutorService增加了一些状态判断和任务提交接口。
- 类AbstractExecutorService实现了定义中的部分接口包括submit方法且新增doInvokeAny相关方法。
- 类ThreadPoolExecutor继承AbstractExecutorService实现了其他在Executor和ExecutorService中定义的接口
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
// 省略相关代码
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
// 省略相关代码
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
// 省略相关代码
}
}
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}
ThreadPoolExecutor的类定义
ThreadPoolExecutor的构造函数
ThreadPoolExecutor的核心构造函数其实只有一个,见下面的代码注释。然后需要解释的是构造函数的几个参数:
corePoolSize
核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。maximumPoolSize
线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。keepAliveTime
非核心线程的闲置超时时间,超过这个时间就会被回收。unit
指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。workQueue
线程池中的任务队列.
常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。threadFactory
线程工厂,提供创建新线程的功能。RejectedExecutionHandler
当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。
public class ThreadPoolExecutor extends AbstractExecutorService {
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
// 使用了Executors.defaultThreadFactory()创建了线程池工厂
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 核心构造函数,核心构造函数,核心构造函数,重要事情说三遍!!!
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
在ThreadPoolExecutor源码中额外的收获就是线程池工厂的实现方法了,其实我们有时候在使用Executors的多线程的时候一定要传入线程池工厂的实现方式,这样当我们通过jstack等方法定位问题的时候方便鉴别出线程归属。
Executors的DefaultThreadFactory方法内部通过静态变量tatic final AtomicInteger poolNumber来保证DefaultThreadFactory的实例能够有唯一的id标识,至于线程池工厂创建的线程会自带线程池的前缀以及自增id。
这也算额外的收获吧。
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
RejectedExecutionHandler是一个接口,里面只有一个方法。当要创建的线程数量大于线程池的最大线程数的时候,就会调用这个接口里的这个方法实现任务的拒绝。可以自定义这个接口,实现对这些超出数量的任务的处理。
ThreadPoolExecutor自己已经提供了四个拒绝策略,分别是
- CallerRunsPolicy:CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务
- AbortPolicy:ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy,直接抛出异常。
- DiscardPolicy:让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。
- DiscardOldestPolicy:抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
参考文章
Java多线程-线程池ThreadPoolExecutor构造方法和规则
java多线程-ThreadPoolExecutor的拒绝策略RejectedExecutionHandler

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Python进阶-算法-递归
版权声明:如需转载,请注明转载地址。 https://blog.csdn.net/oJohnny123/article/details/81911889 1、递归就是自己调自己 2、在使用递归策略时,必须有一个递归出口,也就是得有一个明确的递归结束条件。 3、递归算法效率并不是很高,而且容易栈溢出。 4、递归算法写的程序都会很简洁。 代码: def fun1(x): if x > 0 : print(x) fun1(x - 1) def fun2(x): if x > 0 : fun2(x - 1) print(x) fun1(5) print('='*100) fun2(5) print('='*100) 执行结果: /Users/liaoyangyang/crc/codes-python/LearnPython/venv/bin/python /Users/liaoyangyang/crc/codes-python/LearnPython/test.py 5 4 3 2 1 ===============================================...
-
下一篇
vue移动端预览pdf功能(可以多页)以及第三方电子签章不能正常展示解决方案(多种)
随着网络的发展,PC端的网站已不能满足人们的需求,人们更喜欢采用移动端进行业务操作。最近公司要求把PC端网站的订单合同签署功能移植到微信端,而不再局限于PC端操作。 对于这样的要求,我们需要了解的是订单合同,协议书之类的一般都属于不可以任意修改的文件(PDF),这样的文件,现在的浏览器基本都支持直接访问的。但是遗憾的是,移动端并不支持直接访问,这样我们需要对PDF文件进行解析处理。首先我们考虑到通过服务器访问到PDF文件,传递到前端,再由前端进行解析处理。 这里前端框架采用vue,vue中有整合到第三方pdf解析库pdfjs-dist 1、安装 pdfjs-dist npm install pdfjs-dist --save 2、创建pdf组件 pdf-component.vue <template> <div> <canvas v-for="page in pages" :id="'the-canvas'+page" :key="page"></canvas> </div> </template> <scr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- MySQL数据库在高并发下的优化方案
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8编译安装MySQL8.0.19