JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解
概述
CountDownLatch
和CyclicBarrier
有着相似之处,并且也常常有人将他们拿出来进行比较,这次,笔者试着从源码的角度分别解析这两个类,并且从源码的角度出发,看看两个类的不同之处。
CountDownLatch
CountDownLatch
从字面上来看是一个计数工具类,实际上这个类是用来进行多线程计数的JAVA方法。
CountDownLatch
内部的实现主要是依靠AQS
的共享模式。当一个线程把CountDownLatch
初始化了一个count
之后,其他的线程调用await
就会阻塞住,直到其他的线程一个一个调用countDown
方法进行release
操作,把count
的值减到0,即把同步锁释放掉,await
才会进行下去。
Sync
内部主要还是实现了一个继承自AQS
的同步器Sync
。Sync
源码如下:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 构造方法,参数是count的数值 Sync(int count) { // 内部使用state来存储count setState(count); } // 获取count的值 int getCount() { return getState(); } // 尝试获取分享模式同步器 protected int tryAcquireShared(int acquires) { // 判断state的值,如果为0则获取成功,否则获取失败 // 继承自AQS,根据AQS中的注释我们可以知道如果返回结果 // 大于0则说明获取成功,如果小于0则说明获取失败 // 此处不会返回0,因为没有意义 return (getState() == 0) ? 1 : -1; } // 释放同步器 protected boolean tryReleaseShared(int releases) { // 自选操作 for (;;) { // 获取state int c = getState(); // 如果state为0,直接返回false if (c == 0) return false; // 计算state-1的结果 int nextc = c-1; // CAS操作将这个值同步到state上 if (compareAndSetState(c, nextc)) // 如果同步成功,则判断是否此时state为0 return nextc == 0; } } }
Sync
是继承自AQS
的同步器,这段代码中值得拿出来讨论的有以下几点:
-
为什么用state来存储count的数值?
因为state和count其实上是一个概念,当state为0的时候说明资源是空闲的,当count为0时,说明所有的
CountDownLatch
线程都已经完成,所以两者虽然说不是同样的意义,但是在代码实现层面的表现是完全一致的,因此可以将count记录在state中。 -
为什么
tryAcquireShared
不会返回0?首先需要解释下
tryAcquireShared
在AQS中可能的返回值:负数说明是不可以获取共享锁,0说明是可以获取共享锁,但是当前线程获取后已经把所有的共享锁资源占完了,接下来的线程将不会再有多余资源可以获取了,正数则说明了你可以获取共享锁,并且之后还有余量可以给其他线程提供共享锁。然后我们回过来看CountDownLatch
内部的tryAcquireShared
,我们在实现上完全不关注后续线程,后续的资源占用状况,我只要当前状态,那么这个0的返回值实际上是没有必要的。 -
为什么
tryReleaseShared
中的参数不被使用到?根据这个类的实现方式,我们可以知道
tryReleaseShared
的参数一定是1,因为线程的完成一定是一个一个倒数完成的。实际上我们去看countDown
方法内部调用到了sync.releaseShared
方法的时候可以发现他写死了参数为1,所以实际上tryReleaseShared
中的参数不被使用到的原因是因为参数值固定为1.
构造函数和方法
// 构造方法 public CountDownLatch(int count) { // count必须大于0 if (count < 0) throw new IllegalArgumentException("count < 0"); // 初始化Sync this.sync = new Sync(count); } // 等待获取锁(可被打断) public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 等待获取锁(延迟) public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // 计数器降低(释放同步器) // 每次调用减少1 public void countDown() { sync.releaseShared(1); } // 获取count public long getCount() { return sync.getCount(); } // toString public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }
CyclicBarrier
CyclicBarrier
从字面上看是循环栅栏,在JAVA中的作用是让所有的线程完成后进行等待,直到所有的线程全部完成,再进行接下来的操作。
CyclicBarrier
并没有直接继承AQS实现同步,而是借助了可重入锁ReentrantLock
以及Condition
来完成自己的内部逻辑。
成员变量
// 锁 private final ReentrantLock lock = new ReentrantLock(); // 条件 private final Condition trip = lock.newCondition(); // 线程数 private final int parties; // 执行完所有线程后执行的Runnable方法,可以为空 private final Runnable barrierCommand; // 分组 private Generation generation = new Generation(); // 未完成的线程数 private int count; private static class Generation { boolean broken = false; }
我们可以看到成员变量中有一个很陌生的类Generation
,这个是CyclicBarrier
内部声明的一个static
类,作用是帮助区分线程的分组分代,使得CyclicBarrier
可以被复用,如果这个简单的解释不能够让你很好地理解的话可以看接下来的源码解析,通过实现来理解它的用途。
构造函数
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; public CyclicBarrier(int parties) { this(parties, null); }
很常规的构造函数,只是简单的初始化成员变量,没有特别的地方。
核心方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
await
是CyclicBarrier
的核心方法,就是靠着这个方法来实现线程的统一规划的,其中调用的是内部实现的doWait
,我们来看下代码:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 常规的加锁操作,至于为什么要用本地变量操作, // 可以去看下我写的另一篇ArrayBlockingQueue的相关文章 final ReentrantLock lock = this.lock; lock.lock(); try { // 获取Generation类 final Generation g = generation; // 查看generation是否是broken,如果是broken的, // 那说明之前可能因为某些线程中断或者是一些意外状态导致没有办法 // 完成所有线程到达终点(tripped)的目标而只能报错 if (g.broken) throw new BrokenBarrierException(); // 如果线程被外部中断需要报错,并且在内部需要将 // generation的broken置为true来让其他线程能够感知到中断 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 将线程未完成数减1 int index = --count; // 如果此时剩余线程数为0,则说明所有的线程均已完成,即到达tripped状态 if (index == 0) { boolean ranAction = false; try { // 如果有预设完成后执行的方法,则执行 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 此时由于这一个轮回的线程已经全部完成, // 所以调用nextGeneration方法开启一个新的轮回 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 如果此时还有其他的线程未完成,则当前线程开启自旋模式 for (;;) { try { if (!timed) // 如果timed为false,trip则阻塞住直到被唤醒 trip.await(); else if (nanos > 0L) // 如果timed为true,则调用awaitNanos设定时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 查看generation是否是broken,如果是broken的抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果g != generation意味着generation // 已经被赋予了一个新的对象,这说明要么是所有线程已经完成任务开启下一个轮回, // 要么是已经失败了,然后开启的下一个轮回,无论是哪一种情况,都return if (g != generation) return index; // 如果已经超时,则强制打断 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
看完这段核心代码之后我们回头再来反思Generation
的意义,我们已经可以大致的给出使用Generation
的理由了:
不同于CountDownLatch
的实现,CyclicBarrier
采取了更加复杂的方式,原因便是因为内部涉及到了多线程之间的干预与通信,CountDownLatch
不关心线程的实现与进程,他只是一个计数器,而CyclicBarrier
则需要知道线程是否正常的完结,是否被中断,如果用其他的方式代价会比较高,因此,CyclicBarrier
的作者通过静态内部类的方式将整个分代的状态共享于多个线程之间,保证每个线程能够获取到栅栏的状态以及能够将自身的状态更好的反馈回去。同时,这种方式便于重置,也使得CyclicBarrier
可以高效的重用。至于为什么broken
没有用volatile
修饰,因为类的方法内部全部都上了锁,所以不会出现数据不同步的问题。
总结
CountDownLatch
和CyclicBarrier
从使用上来说可能会有一些相似之处,但是在我们看完源码之后我们会发现两者可以说是天差地别,实现原理,实现方式,应用场景均不相同,总结下来有以下几点:
CountDownLatch
实现直接依托于AQS
;CyclicBarrier
则是借助了ReentrantLock
以及Condition
CountDownLatch
是作为计数器存在的,因此采取了讨巧的设计,源码结构清晰并且简单,同样功能也较为简单;CyclicBarrier
则为了实现多线程的掌控,采用了比较复杂的设计,在代码实现上也显得比较弯弯绕绕。- 由于
CyclicBarrier
采用的实现方式,相比一次性的CountDownLatch
,CyclicBarrier
可以多次重复使用 - 计数方式的不同:
CountDownLatch
采用累加计数,CyclicBarrier
则使用倒数计数
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Service Mesh 最火项目 Istio 分层架构,你真的了解吗?
作者 | 王夕宁 阿里巴巴高级技术专家 参与“阿里巴巴云原生”公众号文末留言互动,即有机会获得赠书福利! 本文摘自于由阿里云高级技术专家王夕宁撰写的《Istio服务网格技术解析与实践》一书,文章从基础概念入手,介绍了什么是服务网格及Istio,针对 2020服务网格的三大发展趋势,体系化、全方位地介绍了 Istio 服务网格的相关知识。你只需开心参与文末互动,我们负责买单!技术人必备书籍《Istio 服务网格技术解析与实践》免费领~ Istio是一个开源的服务网格,可为分布式微服务架构提供所需的基础运行和管理要素。随着各组织越来越多地采用云平台,开发者必须使用微服务设计架构以实现可移植性,而运维人员必须管理包含混合云部署和多云部署的大型分布式应用。Istio采用一种一致的方式来保护、连接和监控微服务,降低了管理微服务部署的复杂性。 从架构设计上来看,Istio服务网格在逻辑上分为控制平面和数据平面两部分。其中,控制平面Pilot负责管理和配置代理来路由流量,并配置Mixer以实施策略和收集遥测数据;数据平面由一组以Sidecar方式部署的智能代理(Envoy)组成,这些代理可以调节和控...
- 下一篇
一文搞懂蓝绿部署和金丝雀发布
本文来自Rancher Labs 在之前关于CI/CD的文章中,我们简单讨论了蓝绿部署和金丝雀发布以及它们在持续交付中所扮演的角色。这些都是十分有效的方法,能够大大降低与应用程序部署相关的风险。所以,这篇文章我们来深入介绍蓝绿部署和金丝雀发布。 蓝绿部署和金丝雀发布通过让IT人员可以在发布过程中发生问题时能够还原到先前版本来减轻应用程序部署的风险。这两个方法让版本之间来回切换就像轻按开关一样容易,并且可以自动执行,从而最大程度减少了用户暴露在错误代码的时间。在我们更进一步讨论这两种方法之前,让我们先区分部署和发布。 如何将部署与发布解耦 虽然这两个词经常混淆使用,但实际上部署和发布是两个独立的过程。部署是指在特定环境(包括生产环境)安装指定软件版本的过程,更多是一种技术行为。它不一定必须与发布相关联。而发布则是指向客户群提供新功能,是一种业务决策。 传统过程中,会在发布日期前一天部署好更新或是新功能,该更新或功能发布后可能会在媒体中广泛传播。众所周知,在部署过程中可能会出错,而因为发布时间与部署时间十分相近,因此几乎没有解决问题的空间。而如果将部署和发布解耦,那么在整个功能开发过程中频...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker快速安装Oracle11G,搭建oracle11g学习环境