Java线程池详解及常用方法
Java线程池详解及常用方法
前言
最近被问到了线程池的相关问题。于是准备开始写一些多线程相关的文章。这篇将介绍一下线程池的基本使用。
Executors
Executors是concurrent包下的一个类,为我们提供了创建线程池的简便方法。
Executors可以创建我们常用的四种线程池:
(1)newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。不设上限,提交的任务将立即执行。
(2)newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
(3)newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
(4)newSingleThreadExecutor 创建一个单线程化的线程池执行任务。
Executors的坏处
正常来说,我们不应该使用这种方式创建线程池,应该使用ThreadPoolExecutor来创建线程池。Executors创建的线程池也是调用的ThreadPoolExcutor的构造函数。通过原来可以看出。
我们也看到了这里面的LinkedBlockingQueue并没有指定队列的大小是一个无界队列,这样可能会造成oom。所以我们要使用ThreadPoolExecutor这种方式。
ThreadPoolExecutor
通过源码看到ThreadPoolExecutor比较全的构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
分别解释一下参数的意义
corePoolSize:线程池长期维持的线程数,即使线程处于Idle状态,也不会回收。
maximumPoolSize:线程数的上限
keepAliveTime:空闲的时间,超过这个空闲时间,线程将被回收
unit:空闲时间的时间单位
workQueue:任务的排队队列,当线程都运行的时候,有空的线程将从队列汇总进行拿取
threadFactroy:当核心线程小于满线程的时候,又需要多加线程,则需要从工厂中获取线程
handler:拒绝策略,当线程过多的时候的策略
线程池针对于任务的执行顺序
首先任务过来之后,看看corePoolSize是否有空闲的,有的话就执行。没有的话,放入任务队列里面。然后任务队列会通知线程工厂,赶紧造几个线程,来执行。当任务超过了最大的线程数,就执行拒绝策略,拒绝执行。
submit方法
线程池建立完毕之后,我们就需要往线程池提交任务。通过线程池的submit方法即可。
submit方法接收两种Runable和Callable。
区别如下:
Runable是实现该接口的run方法,callable是实现接口的call方法。
callable允许使用返回值。
callable允许抛出异常。
提交任务的方式
Future submit(Callable task):这种方式可以拿到返回的结果。
void execute(Runnable command):这种方式拿不到。
Future<?> submit(Runnable task):这种方式可以get,但是永远是null。
blockqueue的限制
我们在创建线程池的时候,如果使用Executors。创建的是无界队列,容易造成oom。所以我们要自己执行queue的大小。
BlockingQueue queue = new ArrayBlockingQueue<>(512)
拒绝策略
当任务队列的queue满了的时候,在提交任务,就要触发拒绝策略。队列中默认的拒绝策略是 AbortPolicy。是直接抛出异常的一种策略。
如果是想实现自定义的策略,可以实现RejectedExecutionHandler 接口。
线程池提供了如下的几种策略供选择。
AbortPolicy:默认策略,抛出RejectedExecutionException
DiscardPolicy:忽略当前提交的任务
DiscardOldestPolicy:丢弃任务队列中最老的任务,给新任务腾出地方
CallerRunsPolicy:由提交任务者执行这个任务
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.DiscardPolicy());
捕捉异常
如之前所说Callable接口的实现,可以获取到结果和异常。通过返回的Future的get方法即可拿到。
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future
@Override public Object call() throws Exception { throw new RuntimeException("exception");// 该异常会在调用Future.get()时传递给调用者 } });
try {
Object result = future.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
e.printStackTrace();
}
正确构造线程池的方式
int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize, poolSize,
0, TimeUnit.SECONDS, queue, policy);
获取单个结果
通过submit提交一个任务后,可以获取到一个future,调用get方法会阻塞并等待执行结果。get(long timeout, TimeUnit unit)可以指定等待的超时时间。
获取多个结果
可以使用循环依次调用,也可以使用ExecutorCompletionService。该类的take方式,会阻塞等待某一任务完成。向CompletionService批量提交任务后,只需调用相同次数的CompletionService.take()方法,就能获取所有任务的执行结果,获取顺序是任意的,取决于任务的完成顺序。
void solve(Executor executor, Collection> solvers)
throws InterruptedException, ExecutionException {
CompletionService ecs = new ExecutorCompletionService(executor);// 构造器
for (Callable s : solvers)// 提交所有任务
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {// 获取每一个完成的任务
Result r = ecs.take().get(); if (r != null) use(r);
}
}
这个类是对线程池的一个包装,包装完后,听过他进行submit和take。
单个任务超时
Future.get(long timeout, TimeUnit unit)。方法可以指定等待的超时时间,超时未完成会抛出TimeoutException。
多个任务超时
等待多个任务完成,并设置最大等待时间,可以通过CountDownLatch完成:
public void testLatch(ExecutorService executorService, List tasks)
throws InterruptedException{ CountDownLatch latch = new CountDownLatch(tasks.size()); for(Runnable r : tasks){ executorService.submit(new Runnable() { @Override public void run() { try{ r.run(); }finally { latch.countDown();// countDown } } }); } latch.await(10, TimeUnit.SECONDS); // 指定超时时间
}
await是总的时间,即使100个任务,需要跑20分钟。我10s超时了 也停止了。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【简说Python WEB】Flask应用的文件结构
【简说Python WEB】Flask应用的文件结构 目录【简说Python WEB】Flask应用的文件结构1.文件结构的目录2.配置程序--config.py3.app应用包4.剥离出来的email.py5.蓝本(BLueprint)的应用6.main目录的error.py代码剥离: main目录的view.py代码剥离:8.主脚本 9.需要安装的依赖包10.应用启动附录系统环境:Ubuntu 18.04.1 LTS Python使用的是虚拟环境:virutalenv Python的版本:Python 3.6.9 【简说Python WEB】Flask应用的文件结构之前,我们应用了如下的组件: flask-wtfFlask-SQLAlchemyFlask-MomentJinja2模板Bootstrapflask-mailflask_migrate因为之前调试和应用组件,插件,导致app.py有一定的体量。对于大多数web应用来说,如果把代码都堆在以前,难以维护。而有一个不错的文件目录组织。可以很好的维护整个项目。通过自我学习,提炼出自己一套容易维护的文件结构。把之前的可以剥离出来的...
- 下一篇
JavaScript运行机制
JavaScript运行机制前言本文要讲的是,浏览器读一个script代码的流程是什么,遇到异步代码会如何处理,宏观任务和微观任务如何处理。 开始前先来看几个概念。 栈(后进先出)首先要说一个栈模型,函数的调用形成了栈帧。 function f1() { f2(); }function f2() {}f1();例如这段代码,调用f1时,创建第一帧;f1调用f2时,创建第二帧。第二帧压在第一帧之上,当f2运行完成,此时最上层第二帧弹出栈,当f1运行完成,此时最上层第一帧也弹出栈,栈就空了。也就是常说的后进先出。 这个栈也就是常说的执行栈,执行的是任务队列里的任务。 队列(先进先出)然后说一下队列。队列中放着任务。 如果有新的任务(例如用户触发了点击事件),会加入队列,排在后面。 队列里的任务会放在执行栈中执行。 队列有两种:宏观任务队列和微观任务队列。分别放着两种任务:宏观任务和微观任务。 宏观任务(Task,或者叫MacroTask)宿主发起的任务。 如 包含代码的script, setTimeout, setInterval, setImmediate, requestAnimati...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Red5直播服务器,属于Java语言的直播服务器
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS6,CentOS7官方镜像安装Oracle11G