您现在的位置是:首页 > 文章详情

线程池参数原理及应用

日期:2019-03-23点击:327

线程池原理

    Java创建一个线程很方便,只需new Thread()就可以, 但是当有多个任务需要进行进行处理时,频繁的进行创建和启用线程同样需要系统开销,也不利于管理,于是同mysql的连接池一样,自然有对线程的管理池即线程池。

    做个比喻,线程池好比一个公司,那么线程本身就是一个个的员工,来对线程的创建和销毁进行管理,最大化的进行资源的合理调度。

    Java的线程池创建也很简单,concurrent这个并发包下有Executors可以很方便的进行四种常用线程的创建:

    newFixedThreadPool:创建固定数量的线程的线程池,可以控制最大并发数,常用于知道具体任务的数量,需要进行多线程的操作,如批量插入数据库任务,需要进行10万条数据分页,每1万条数据一页,配置一个线程处理,一共配置10个线程,进行并行批量插入,就可以使用这个线程池来进行,大大减少响应时间

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

    newCachedThreadPool: 创建可一段时间内重复利用的线程池,常用于不知道具体的任务数量,但是还需要进行并行处理的情况,如springboot @Aysnc就可以指定使用这个线程池,来进行一些埋点等的各种业务的异步处理

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

    newSingleThreadExecutor: 创建单个线程的线程池,这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

    newScheduledThreadPool: 创建一个可以定时和重复执行的线程池,常用于定时任务和延时任务的执行线程池

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }

    当然线程池还可以自定义,Java只是提供了几种常用的静态线程池的创建方法,以上也已经将4种线程池的创建源码显示出来了,可以发现线程池的创建都是通过new ThreadPoolExecutor()来实现的,现在主要介绍下几个重要的参数和接口:

    首先ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,AbstractExecutorService又实现了ExecutorService接口,ExecutorService接口继承了只有一个方法execute的Executor。

  下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,分别代表一种时间的单位,秒,分,小时等:
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:  
ArrayBlockingQueue; 有界阻塞队列,由数组实现,需要指定数组大小 LinkedBlockingQueue; 无界阻塞队列,由链表实现,最大值是Integer的最大值 SynchronousQueue; 这个队列不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    其中注意这几个参数都是volatile修饰的,用来保证多线程下的可见性,我们也可以根据这些参数的不同配置,来产生我们需要的线程池。

    有了线程池后,我们需要关注几个线程池的状态:

        

    下图表明几个状态之间的转化关系:

        

    接下来就是举个栗子来表明如何使用:

    ExecutorService executorService = Executors.newFixedThreadPool(15);

    在执行完上述代码后,我们其实就创建了一个有15个核心线程数量,最大也是15个线程数量,空闲线程保存时间为1分钟,采用无限阻塞队列,任务拒绝采用AbortPolicy:丢弃任务并抛出RejectedExecutionException异常的线程池。在创建后,并没有进行活跃的线程工人产生,可用线程数为0,比如接下来有10个任务进来,就会创建10个线程工人来进行工作,并且工作完不会销毁,之后又来了10个任务,之前的10个线程还没有处理完他们自己的任务,这个时候就又会创建5个线程工人来进行任务的处理,有小伙伴有疑问了,那剩下的5个任务怎么办呢,对了,还有阻塞队列,这些没有工人处理的任务会进入待办事项般的阻塞队列,先进先出,待15个工人将手头的活办完之后进行依次处理,因为阻塞队列是无界阻塞队列,因此,任务会不断的丢到这个队列中,所以,并不会创建因为队列太小,而不得已创建几个个临时工来处理,这个几个数量即在最大线程和核心线程之间的差值数量,这些临时线程的有效时间只有keepAliveTime的时间,此外在来了多个任务之后,如果队列是有界的,且任务数超过了最大能够创建的线程数,即工人不能再招了,待办事项列表也满了,这个时候公司旧不干了,抛出异常,任务拒绝策略。

    接下了是实战,结合CompletableFuture进行展示:

    简单介绍下CompletableFuture:CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法,结合线程池可以达到并发编程的目的    

package cn.chinotan; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.List; import java.util.concurrent.*; /** * @program: test * @description: 多线程测试 * @author: xingcheng * @create: 2019-03-23 17:27 **/ @Slf4j public class ExecutorTest { @Test public void test() { ExecutorService executorService = Executors.newFixedThreadPool(15); CompletableFuture[] completableFutures = new CompletableFuture[15]; List<Integer> integers = new CopyOnWriteArrayList<>(); for (int i = 0; i < 15; i++) { int finalI = i; CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> costMethod(finalI), executorService) .whenComplete((r, e) -> { if (null != e) { e.printStackTrace(); } else { integers.add(r); } }); completableFutures[i] = integerCompletableFuture; } CompletableFuture.allOf(completableFutures).join(); long count = integers.stream().count(); log.info("一共处理成功:{}", count); } /** * 耗时的操作 * * @param i * @return */ public int costMethod(int i) { try { TimeUnit.SECONDS.sleep(5); log.info("耗时的操作 {}", i); return 1; } catch (InterruptedException e) { e.printStackTrace(); return 0; } } } 

    运行结果:

    可以看到15个耗时的操作很快就并行执行完成,并且还能返回执行的成功结果数

    以上就是我对线程池的理解和应用,欢迎大家关注和浏览提问,谢谢大家

原文链接:https://my.oschina.net/u/3266761/blog/3026786
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章