# 前言 控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。 控制并发流程的工具类主要有:  # **简介** # **背景** - CountDownLatch是在Java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier、Semaphore、ConcurrenthashMap和BlockingQueue。 - 在java.util.cucurrent包下。 # **概念**  - CountDownLatch是一个同步计数器,他允许一个或者多个线程在另外一组线程执行完成之前一直等待,基于AQS共享模式实现的。 - 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作来。 [Java并发编程实战笔记](https://mp.weixin.qq.com/s/B6YzNvAopKkFxKf-1Q_NcA),感兴趣的可以补补! # **应用场景** > Zookeeper分布式锁,Jmeter模拟高并发等 # **场景1 让多个线程等待:模拟并发,让并发线程一起执行** 为了模拟高并发,让一组线程在指定时刻(秒杀时间)执行抢购,这些线程在准备就绪后,进行等待(CountDownLatch.await()),直到秒杀时刻的到来,然后一拥而上。这也是本地测试接口并发的一个简易实现。 在这个场景中,CountDownLatch充当的是一个发令枪的角色;就像田径赛跑时,运动员会在起跑线做准备动作,等到发令枪一声响,运动员就会奋力奔跑。和上面的秒杀场景类似。 **代码实现如下**: ``` package com.niuh.tools; import java.util.concurrent.CountDownLatch; /** *
* CountDownLatch示例 * 场景1 让多个线程等待:模拟并发,让并发线程一起执行 *
*/ public class CountDownLatchRunner1 { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; i++) { new Thread(() -> { try { //准备完毕……运动员都阻塞在这,等待号令 countDownLatch.await(); String parter = "【" + Thread.currentThread().getName() + "】"; System.out.println(parter + "开始执行……"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } Thread.sleep(2000);// 裁判准备发令 countDownLatch.countDown();// 发令枪:执行发令 } } ``` **运行结果**: ``` 【Thread-2】开始执行…… 【Thread-4】开始执行…… 【Thread-3】开始执行…… 【Thread-0】开始执行…… 【Thread-1】开始执行…… ``` 我们通过CountDownLatch.await(),让多个参与者线程启动后阻塞等待,然后在主线程 调用CountDownLatch.countdown(1) 将计数减为0,让所有线程一起往下执行;以此实现了多个线程在同一时刻并发执行,来模拟并发请求的目的。 # **场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并** 很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check;这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。 **代码实现如下**: ``` package com.niuh.tools; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; /** *
* CountDownLatch示例 * 场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并 *
*/ public class CountDownLatchRunner2 { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { final int index = i; new Thread(() -> { try { Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000)); System.out.println("finish" + index + Thread.currentThread().getName()); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } countDownLatch.await();// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。 System.out.println("主线程:在所有任务运行完成后,进行结果汇总"); } } ``` **运行结果**: ``` finish3Thread-3 finish0Thread-0 finish1Thread-1 finish4Thread-4 finish2Thread-2 主线程:在所有任务运行完成后,进行结果汇总 ``` 在每个线程(任务) 完成的最后一行加上CountDownLatch.countDown(),让计数器-1;当所有线程完成-1,计数器减到0后,主线程往下执行汇总任务。 # **源码分析** > 本文基于JDK1.8 CountDownLatch 类图  从图中可以看出CountDownLatch是基于Sync类实现的,而Sync继承AQS,使用的是AQS共享模式。 其内部主要变量和方法如下:  在我们方法中调用 awit() 和 countDown() 的时候,发生了几个关键的调用关系,如下图所示:  其与AQS交互原理如下:  # **构造函数** CountDownLatch类中只提供了一个构造器,参数count为计数器的大小 ``` public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } ``` 这里需要注意,设置state的数量只有在初始化CountDownLatch的时候,如果该state被减成了0,就无法继续使用这个CountDownLatch了,需要重新new一个,这就是这个类不可重用的原因,有另一个类也实现了类似的功能,但是可以重用,就是CyclicBarrier。 # **内部同步器** ``` private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //初始化,设置资源个数 Sync(int count) { setState(count); } //获取共享资源个数 int getCount() { return getState(); } //尝试获取共享锁,只有当共享资源个数为0的时候,才会返回1,否则为-1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //释放共享资源,通过CAS每次对state减1 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } ``` # **主要方法** 类中有三个方法是最重要的 ``` // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //和await()方法类似,只不过等待一定的时间后count值还没变为0的化就会继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //将count值减1 public void countDown() { sync.releaseShared(1); } ``` # **await()方法** ``` // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } ``` 进入 AbstractQueuedSynchronizer #acquireSharedInterruptibly()方法. ``` public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //等待过程不可中断 if (Thread.interrupted()) throw new InterruptedException(); //这里的tryAcquireShared在AbstractQueuedSynchronizer中没有实现,在上面介绍的Sync中实现的 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } ``` 在上面介绍Sync类的时候#tryAcquireShared(),当AQS的state = 0的时候才会返回1,否则一直返回-1,如果返回-1,要执行# doAcquireSharedInterruptibly(),进入该方法 ``` private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //这里就把主线程加入队列,队列中有两个节点,第一个是虚拟节点,第二个就是主线程节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //总共只有两个节点,主线程前一个就是首节点 final Node p = node.predecessor(); if (p == head) { //这里又执行到CountDownLatch中Sync类中实现的方法,判断state是否为0 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //如果state不为0,这里会把主线程挂起阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } ``` 这里使用AQS很神奇,在阻塞队列中就只加入了一个主线程,但是呢,只要其他线程没有执行完,那state就不为0,那主线程就在这里阻塞着,那问题了,谁来唤醒这个主线程呢?就是 countDown() 这个方法。 # **await(long timeout, TimeUnit unit)方法** 该方法就是指定等待时间,如果在规定的等待时间中没有完成,就直接返回false,在主线程中可以根据这个状态进行后续的处理。 ``` //和await()方法类似,只不过等待一定的时间后count值还没变为0的化就会继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } ``` # **countDown() 方法** ``` //将count值减1 public void countDown() { sync.releaseShared(1); } ``` 进入 AbstractQueuedSynchronizer #releaseShared方法 ``` public final boolean releaseShared(int arg) { //该方法同样在AbstractQueuedSynchronizer中没有实现,在CountDownLatch中实现 if (tryReleaseShared(arg)) { //唤醒主线程 doReleaseShared(); return true; } return false; } ``` 在分析Sync类的时候,介绍了tryReleaseShared(),该方法会把AQS的state减1,如果减1操作成功,执行唤醒主线程操作,进入 AbstractQueuedSynchronizer#tryReleaseShared()方法 ``` private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //首节点状态为SIGNAL = -1 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒主线程,也就是队列中的第二个节点,如果线程没有执行完成,主线程被唤醒之后,发现state依然不为零,会再次阻塞 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } ``` # **总结** CountDownLatch 和 Semaphore 一样都是共享模式下资源问题,这些源码实现AQS的模版方法,然后使用CAS+循环重试实现自己的功能。在RT多个资源调用,或者执行某种操作依赖其他操作完成下可以发挥这个计数器的作用,小编这里也总结了一些互联网大厂经常面试的[Java并发编程面试真题 123道](https://mp.weixin.qq.com/s/WHqMd85Nipx31gr2aowobA),感兴趣的可以来实战一下!