您现在的位置是:首页 > 文章详情

Java并发编程之深入理解线程池原理及实现

日期:2018-11-08点击:395

Java线程池在实际的应用开发中十分广泛。虽然Java1.5之后在JUC包中提供了内置线程池可以拿来就用,但是这之前仍有许多老的应用和系统是需要程序员自己开发的。因此,基于线程池的需求背景、技术要求了解线程池原理和实现,一方面可以更为深刻理解Java多线程开发,有助于解决业务系统中因为线程问题所产生的bug;另一方面也有利于对遗留代码的重构。

如果需要先行了解Java并发编程的基础知识,可参考以下随笔:

1. Java并发编程之线程创建和启动(Thread、Runnable、Callable和Future)

2. Java并发编程之线程生命周期、守护线程、优先级、关闭和join、sleep、yield、interrupt

3. Java并发编程之线程安全、线程通信

4. Java并发编程之ThreadGroup

线程池原理

所谓的线程池,跟JDBC连接池、代理池等一样,属于一种“池”的设计模式。在设计好的数据结构中存放一定数量的线程,并且根据任务需要自动调整线程数量的多少,直到峰值。具体说来,线程池需要满足若干条件:

1. 任务队列:用于缓存提交的任务

2. QueueSize:任务队列存放的Runnable任务实例的数量,需要有限制值防止内存溢出。

3. 线程数量管理:创建线程时初始的数量init;线程池自动扩充时最大的线程数量max;空闲时的活跃线程或核心线程数量core。三者满足init<=core<=max

4. 工作线程队列:用于存储工作线程,并统计工作线程的数量。

5. 任务拒绝策略:如果线程数量达到上限且任务队列已满,需要有拒绝策略通知任务提交者,这个在工程实践中非常重要。

6. 线程工厂:用于个性化定制线程,如设置守护线程、线程名称等。

7. Keepedalive时间:线程各个重要参数自动维护的时间间隔。

线程池原理图如下:

线程池实现

“模块设计,类图先行”。明确了线程池需要实现的功能之后,就可以画出线程池的草图了,核心接口及实现类如下:

ThreadPool接口

ThreadPool接口主要定义一个线程池的基本属性,如任务提交、初始容量、最大容量、核心容量等。实现代码如下:

 1 public interface ThreadPool {  2  3 //submit tasks to thread pool  4 void execute(Runnable runnable);  5 //close pool  6 void shutdown();  7 //get the initial size of pool  8 int getInitSize();  9 //get the max size of pool 10 int getMaxSize(); 11 //get the core size of pool 12 int getCoreSize(); 13 //get the cache tasks queue of pool 14 int getQueueSize(); 15 //get the active thread volume of pool 16 int getActiveCount(); 17 //check if pool has been shutdown 18 boolean isShutdown(); 19 }

RunnableQueue接口

这个接口的作用与BlockingQueue接口一样,用于存储提交的Runnable实现类任务。

1 public interface RunnableQueue { 2 //提交任务到队列 3 void offer(Runnable runnable); 4 //从队列中获取任务 5 Runnable take() throws InterruptedException; 6 //返回队列中任务数 7 int size(); 8 }

ThreadFactory接口

定义了个性化创建线程的工厂方法

1 @FunctionalInterface 2 public interface ThreadFactory { 3 4  Thread createThread(Runnable runnable); 5 6 }

DenyPolicy接口

定义了线程池的拒绝策略,即当任务队列达到上限时,采取何种措施拒绝。注意接口内定义了内部类作为外围接口的实现类(该类自动为public和static,像这种嵌套类的实现,可查询《Java编程思想》)。

 1 @FunctionalInterface  2 public interface DenyPolicy {  3  4 void reject(Runnable runnable, ThreadPool threadPool);  5 //定义嵌套类作为拒绝策略的实现类  6 //1.拒绝并丢弃任务  7 class DiscardDenyPolicy implements DenyPolicy{  8  9  @Override 10 public void reject(Runnable runnable, ThreadPool threadPool) { 11 12  } 13  } 14 15 //2.拒绝并抛出自定义异常 16 class AbortDenyPolicy implements DenyPolicy{ 17 18  @Override 19 public void reject(Runnable runnable, ThreadPool threadPool) { 20 throw new RunnableDenyException("The runnable " + runnable + " will abort."); 21  } 22  } 23 24 //3.拒绝, 使用提交者所在线程来完成线程任务. 25 class RunnerDenyPolicy implements DenyPolicy{ 26 27  @Override 28 public void reject(Runnable runnable, ThreadPool threadPool) { 29 30 if(!threadPool.isShutdown()) { 31  runnable.run(); 32  } 33  } 34  } 35 }

其实实现了自定义异常类RunnableDenyException:

1 public class RunnableDenyException extends RuntimeException { 2 3 private static final long serialVersionUID = 112311231231412L; 4 5 public RunnableDenyException(String message) { 6 super(message); 7  } 8 }

InternalTask实现类

Runnable的实现类,会使用到RunnableQueue,它的作用其实是封装了一个任务实例,把Runnable任务的run方法封装到自己的Run方法实现中,并且提供了一个stop方法,用于在线程池销毁或数量维护时停止当前线程。

 1 public class InternalTask implements Runnable {  2 //组合一个RunnableQueue的引用  3 private final RunnableQueue runnableQueue;  4 //使用volatile关键字修饰开关变量  5 private volatile boolean running = true;  6  7 public InternalTask(RunnableQueue runnableQueue) {  8 this.runnableQueue = runnableQueue;  9  } 10  @Override 11 public void run() { 12 // if current task match "both running and isInterrupt" are true 13 // continue to take runnable from queue and run 14 while(running && !Thread.currentThread().isInterrupted()) { 15 try { 16 Runnable task = runnableQueue.take(); 17  task.run(); 18 } catch (Exception e) { 19 running = false; 20 break; 21  } 22  } 23 24  } 25 //停止线程的开关方法 26 public void stop() { 27 this.running = false; 28  } 29 }

到这里,一个基本线程池的骨架就搭好了,接下来主要是实现各接口,实现具体的方法。

1. 队列的实现类LinkedRunnableQueue

 1 public class LinkedRunnableQueue implements RunnableQueue {  2 //设置队列上限  3 private final int limit;  4 //设置拒绝策略的引用  5 private final DenyPolicy denyPolicy;  6 //使用LinkedList作为队列的具体实现类  7 private final LinkedList<Runnable> runnableList = new LinkedList<>();  8 //设置线程池的引用  9 private final ThreadPool threadPool; 10 //构造方法时赋初始值 11 public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) { 12 this.limit = limit; 13 this.denyPolicy = denyPolicy; 14 this.threadPool = threadPool; 15  } 16 17  @Override 18 public void offer(Runnable runnable) { 19 //使用同步锁, 确保入队的线程安全 20 synchronized (runnableList) { 21 //当达到队列上限, 调用拒绝策略;否则加入队尾, 并唤醒阻塞中的线程. 22 if(runnableList.size() >= limit) { 23  denyPolicy.reject(runnable, threadPool); 24 }else { 25  runnableList.addLast(runnable); 26  runnableList.notifyAll(); 27  } 28  } 29  } 30 31  @Override 32 public Runnable take() throws InterruptedException { 33 synchronized (runnableList) { 34 35 while(runnableList.isEmpty()) { 36 try { 37 //如果队列中没有可执行任务, 线程挂起, 进入runnableList关联的monitor waitset中等待被唤醒 38  runnableList.wait(); 39 } catch (InterruptedException e) { 40 //如果被中断, 需要抛出异常 41 throw e; 42  } 43  } 44 return runnableList.removeFirst(); 45  } 46  } 47 48  @Override 49 public int size() { 50 synchronized (runnableList) { 51 //返回队列中的任务数量 52 return runnableList.size(); 53  } 54  } 55 }

2. 线程工厂的实现

 1 public class DefaultThreadFactory implements ThreadFactory {  2 //定义原子类的Integer作为线程组的计数  3 private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);  4 //定义线程组对象  5 private static final ThreadGroup group = new ThreadGroup("MyThreadPool-"+ GROUP_COUNTER.getAndDecrement());  6 //定义生产的线程计数  7 private static final AtomicInteger COUNTER = new AtomicInteger(0);  8  9  @Override 10 public Thread createThread(Runnable runnable) { 11 return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement()); 12  } 13 }

3. 线程池的实现

线程池的实现相对比较复杂, 运用了多种设计模式的思想,核心的要点包括:

1. 使用私有内部类的方式来复用Thread类,防止向外暴露Thread类的方法;

2. 核心组成部分主要是LinkedList实现的任务队列和ArrayDeque实现的工作线程队列,构成了主要的存储主体。

3. 核心的扩容机制需要RunnableQueue + InternalTask + ThreadFactory的结合, 简单说来就是通过判定任务数是否达到阈值,然后增加工作线程的数量。

 1 public class BasicThreadPool implements ThreadPool {  2 //为了不暴露Thread类的方法, 使用私有内部类WorkThread来继承Thread类  3 private WorkThread workThread;  4 //线程池的基本属性  5 private final int initSize;  6 private final int maxSize;  7 private final int coreSize;  8 private int activeCount;  9 //线程工厂引用  10 private final ThreadFactory threadFactory;  11 //队列引用  12 private final RunnableQueue runnableQueue;  13 //线程池销毁标识  14 private volatile boolean isShutdown = false;  15 //工作线程的队列, 使用ArrayDeque实现  16 private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();  17 //定义了一个默认的拒绝策略  18 private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();  19 //定义了一个默认的工厂对象  20 private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();  21  22 private final long keepAliveTime;  23 private final TimeUnit timeUnit;  24 //默认的构造器, 只需要传入初始容量, 最大容量, 核心容量和队列上限  25 public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {  26 this(initSize, maxSize, coreSize, queueSize, DEFAULT_THREAD_FACTORY,  27 DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);  28  }  29 //完整构造器  30 public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize, ThreadFactory threadFactory,  31 DenyPolicy denyPolicy,long keepAliveTime, TimeUnit timeUnit) {  32 this.workThread = new WorkThread();  33 this.initSize = initSize;  34 this.maxSize = maxSize;  35 this.coreSize = coreSize;  36 this.threadFactory = threadFactory;  37 this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);  38 this.keepAliveTime = keepAliveTime;  39 this.timeUnit = timeUnit;  40 this.init();  41  }  42 //线程池的初始化方法, 在构造器中被调用, 用于启动工作线程  43 private void init() {  44  workThread.start();  45 for(int i = 0; i < initSize; i++) {  46  newThread();  47  }  48  }  49 //封装了工作线程的启动方法:  50 //1. 使用InternalTask封装RunnableQueue对象  51 //2. 通过工厂方法制造工作线程并启动  52 //3. 工作线程入队, 工作线程队列计数器+1  53 private void newThread() {  54 InternalTask internalTask = new InternalTask(runnableQueue);  55 Thread thread = this.threadFactory.createThread(internalTask);  56 ThreadTask threadTask = new ThreadTask(thread, internalTask);  57  threadQueue.offer(threadTask);  58 this.activeCount++;  59  thread.start();  60  }  61 //工作线程出队的方法  62 private void removeThread() {  63 ThreadTask threadTask = threadQueue.remove();  64  threadTask.internalTask.stop();  65 this.activeCount--;  66  }  67 //核心:通过内部类继承Thread方法, 设计了自动扩容的机制.  68 //为了防止过快增加到Max容量, 使用continue来退出循环  69 private class WorkThread extends Thread{  70  @Override  71 public void run() {  72 while(!isShutdown && !isInterrupted()) {  73 try {  74  timeUnit.sleep(keepAliveTime);  75 } catch (InterruptedException e) {  76 isShutdown = true;  77 break;  78  }  79 synchronized (this) {  80 if(isShutdown) {  81 break;  82  }  83 if(runnableQueue.size() > 0 && activeCount < coreSize) {  84 for(int i = initSize; i<coreSize;i++) {  85  newThread();  86  }  87 continue;  88  }  89 if(runnableQueue.size() > 0 && activeCount < maxSize) {  90 for(int i = coreSize; i<maxSize;i++) {  91  newThread();  92  }  93  }  94 if(runnableQueue.size()==0 && activeCount > coreSize) {  95 for(int i = coreSize; i < activeCount; i++) {  96  removeThread();  97  }  98  }  99 100  } 101  } 102  } 103  } 104 105  @Override 106 public void execute(Runnable runnable) { 107 //如果线程池已经销毁, 将抛出异常 108 if(this.isShutdown) { 109 throw new IllegalStateException("the thread pool is destoried"); 110  } 111 this.runnableQueue.offer(runnable); 112  } 113 114  @Override 115 public void shutdown() { 116 synchronized(this) { 117 //防止重复销毁 118 if(isShutdown) { 119 return; 120  } 121 //重置关闭标识 122 isShutdown = true; 123 //关闭任务工作线程 124 threadQueue.forEach(threadTask -> { 125  threadTask.internalTask.stop(); 126  threadTask.thread.interrupt(); 127  }); 128 //关闭线程池的工作线程 129 this.workThread.interrupt(); 130  } 131  } 132 133  @Override 134 public int getInitSize() { 135 if(isShutdown) { 136 throw new IllegalStateException("The thread pool is destroy"); 137  } 138 return this.initSize; 139  } 140 141  @Override 142 public int getMaxSize() { 143 if(isShutdown) { 144 throw new IllegalStateException("The thread pool is destroy"); 145  } 146 return this.maxSize; 147  } 148 149  @Override 150 public int getCoreSize() { 151 if(isShutdown) { 152 throw new IllegalStateException("The thread pool is destroy"); 153  } 154 return this.coreSize; 155  } 156 157  @Override 158 public int getQueueSize() { 159 if(isShutdown) { 160 throw new IllegalStateException("The thread pool is destroy"); 161  } 162 return runnableQueue.size(); 163  } 164 165  @Override 166 public int getActiveCount() { 167 synchronized(this) { 168 return this.activeCount; 169  } 170  } 171 172  @Override 173 public boolean isShutdown() { 174 return this.isShutdown; 175  } 176 }

线程池的测试

编写一个简单的测试类,同时启动20个任务,测试线程池的活动状态:

 1 public class ThreadPoolTest {  2  3 public static void main(String[] args) throws InterruptedException {  4  5 final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);  6  7 for(int i = 0; i < 20; i++) {  8 threadPool.execute(() -> {  9 try { 10 TimeUnit.SECONDS.sleep(10); 11 System.out.println(Thread.currentThread().getName() + "is Running and done"); 12 } catch (InterruptedException e) { 13  e.printStackTrace(); 14  } 15  }); 16  } 17 while(true) { 18 System.out.println("getActiveCount: " + threadPool.getActiveCount()); 19 System.out.println("getQueueSize: " + threadPool.getQueueSize()); 20 System.out.println("getCoreSize: " + threadPool.getCoreSize()); 21 System.out.println("getMaxSize: " + threadPool.getMaxSize()); 22 System.out.println("================================================"); 23 TimeUnit.SECONDS.sleep(5); 24  } 25  } 26 }

输出结果如下

thread-pool--1is Running and done thread-pool-0is Running and done getActiveCount: 4 getQueueSize: 14 getCoreSize: 4 getMaxSize: 6 ================================================ getActiveCount: 4 getQueueSize: 14 getCoreSize: 4 getMaxSize: 6 ================================================ thread-pool--3is Running and done thread-pool--2is Running and done thread-pool--1is Running and done thread-pool-0is Running and done getActiveCount: 6 getQueueSize: 8 getCoreSize: 4 getMaxSize: 6

 

原文链接:https://yq.aliyun.com/articles/669539
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章