Java并发编程笔记之 CountDownLatch闭锁的源码分析
JUC 中倒数计数器 CountDownLatch 的使用与原理分析,当需要等待多个线程执行完毕后在做一件事情时候 CountDownLatch 是比调用线程的 join 方法更好的选择,CountDownLatch 与 线程的 join 方法区别是什么?
日常开发中经常会遇到需要在主线程中开启多线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景,它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的初始值必须大于0。另外它还提供了一个countDown方法来操作计数器的值,每调用一次countDown方法计数器都会减1,直到计数器的值减为0时就代表条件已成熟,所有因调用await方法而阻塞的线程都会被唤醒。这就是CountDownLatch的内部机制,看起来很简单,无非就是阻塞一部分线程让其在达到某个条件之后再执行。但是CountDownLatch的应用场景却比较广泛,只要你脑洞够大利用它就可以玩出各种花样。最常见的一个应用场景是开启多个线程同时执行某个任务,等到所有任务都执行完再统计汇总结果。下图动态演示了闭锁阻塞线程的整个过程。
在CountDownLatch出现之前一般都是使用线程的join()方法来实现,但是join不够灵活,不能够满足不同场景的需求。接下来我们看看CountDownLatch的原理实现。
一.CountDownLatch原理探究
从CountDownLatch的名字可以猜测内部应该有个计数器,并且这个计数器是递减的,下面就通过源码看看JDK开发组是何时初始化计数器,何时递减的,计数器变为 0 的时候做了什么操作,多个线程是如何通过计时器值实现同步的,首先我们先看看CountDownLatch内部结构,类图如下:
从类图可以知道CountDownLatch内部还是使用AQS实现的,通过下面构造函数初始化计数器的值,可知实际上是把计数器的值赋值给了AQS的state,也就是这里AQS的状态值来表示计数器值。
构造函数源码如下:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
接下来主要看一下CountDownLatch中几个重要的方法内部是如何调用AQS来实现功能的。
1.void await()方法,当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:(1)当所有线程都调用了CountDownLatch对象的countDown方法后,
也就是说计时器值为 0 的时候。(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。接下来让我们看看await()方法内部是如何调用
AQS的方法的,源码如下:
//CountDownLatch的await()方法 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //AQS的获取共享资源时候可被中断的方法 public final void acquireSharedInterruptibly(int arg)throws InterruptedException { //如果线程被中断则抛异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试看当前是否计数值为0,为0则直接返回,否者进入AQS的队列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //sync类实现的AQS的接口 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
从上面代码可以看到await()方法委托sync调用了AQS的acquireSharedInterruptibly方法,该方法的特点是线程获取资源的时候可以被中断,并且获取到的资源是共享资源,这里为什么要调用AQS的这个方法,而不是调用独占锁的accquireInterruptibly方法呢?这是因为这里状态值需要的并不是非 0 即 1 的效果,而是和初始化时候指定的计数器值有关系,比如你初始化的时候计数器值为 8 ,那么state的值应该就有 0 到 8 的状态,而不是只有 0 和 1 的独占效果。
这里await()方法调用acquireSharedInterruptibly的时候传递的是 1 ,就是说明要获取一个资源,而这里计数器值是资源总数,也就是意味着是让总的资源数减 1 ,acquireSharedInterruptibly内部首先判断如果当前线程被中断了则抛出异常,否则调用sync实现的tryAcquireShared方法看当前状态值(计数器值)是否为 0 ,是则当前线程的await()方法直接返回,否则调用AQS的doAcquireSharedInterruptibly让当前线程阻塞。另外调用tryAcquireShared的方法仅仅是检查当前状态值是不是为 0 ,并没有调用CAS让当前状态值减去 1 。
2.boolean await(long timeout, TimeUnit unit),当线程调用了 CountDownLatch 对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回: (1)当所有线程都调用了 CountDownLatch 对象的 countDown 方法后,也就是计时器值为 0 的时候,这时候返回 true; (2) 设置的 timeout 时间到了,因为超时而返回 false; (3)其它线程调用了当前线程的 interrupt()方法中断了当前线程,当前线程会抛出 InterruptedException 异常后返回。源码如下:
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
3.void countDown() 当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为 0 则会唤醒所有调用await 方法而被阻塞的线程,否则什么都不做,接下来看一下countDown()方法内部是如何调用AQS的方法的,源码如下:
//CountDownLatch的countDown()方法 public void countDown() { //委托sync调用AQS的方法 sync.releaseShared(1); } //AQS的方法 public final boolean releaseShared(int arg) { //调用sync实现的tryReleaseShared if (tryReleaseShared(arg)) { //AQS的释放资源方法 doReleaseShared(); return true; } return false; }
如上面代码可以知道CountDownLatch的countDown()方法是委托sync调用了AQS的releaseShared方法,后者调用了sync 实现的AQS的tryReleaseShared,源码如下:
//syn的方法 protected boolean tryReleaseShared(int releases) { //循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一并更新到state for (;;) { int c = getState(); //如果当前状态值为0则直接返回(1) if (c == 0) return false; //CAS设置计数值减一(2) int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
如上代码可以看到首先获取当前状态值(计数器值),代码(1)如果当前状态值为 0 则直接返回 false ,则countDown()方法直接返回;否则执行代码(2)使用CAS设置计数器减一,CAS失败则循环重试,否则如果当前计数器为 0 则返回 true 。返回 true 后,说明当前线程是最后一个调用countDown()方法的线程,那么该线程除了让计数器减一外,还需要唤醒调用CountDownLatch的await 方法而被阻塞的线程。这里的代码(1)貌似是多余的,其实不然,之所以添加代码 (1) 是为了防止计数器值为 0 后,其他线程又调用了countDown方法,如果没有代码(1),状态值就会变成负数。
4.long getCount() 获取当前计数器的值,也就是 AQS 的 state 的值,一般在 debug 测试时候使用,源码如下:
public long getCount() { return sync.getCount(); } int getCount() { return getState(); }
如上代码可知内部还是调用了 AQS 的 getState 方法来获取 state 的值(计数器当前值)。
到目前为止原理理解的差不多了,接下来用一个例子进行讲解CountDownLatch的用法,例子如下:
package com.hjc; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * Created by cong on 2018/7/6. */ public class CountDownLatchTest { private static AtomicInteger id = new AtomicInteger(); // 创建一个CountDownLatch实例,管理计数为ThreadNum private static volatile CountDownLatch countDownLatch = new CountDownLatch(3); public static void main(String[] args) throws InterruptedException { Thread threadOne = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入场"); countDownLatch.countDown(); } }); Thread threadTwo = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入场"); countDownLatch.countDown(); } }); Thread threadThree = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入场"); countDownLatch.countDown(); } }); // 启动子线程 threadOne.start(); threadTwo.start(); threadThree.start(); System.out.println("等待斗地主玩家进场"); // 等待子线程执行完毕,返回 countDownLatch.await(); System.out.println("斗地主玩家已经满人,开始发牌....."); } }
运行结果如下:
如上代码,创建了一个 CountDownLatch 实例,因为有两个子线程所以构造函数参数传递为 3,主线程调用 countDownLatch.await()方法后会被阻塞。子线程执行完毕后调用 countDownLatch.countDown() 方法让 countDownLatch 内部的计数器减一,等所有子线程执行完毕调用 countDown()后计数器会变为 0,这时候主线程的 await()才会返回。
如果把上面的代码中Thread.sleep和countDownLatch.await()的代码注释掉,运行几遍,运行结果就可能会出现如下结果,如下图:
可以看到在注释掉latch.await()这行之后,就不能保证在所有玩家入场后才开始发牌了。
总结:CountDownLatch 与 join 方法的区别,一个区别是调用一个子线程的 join()方法后,该线程会一直被阻塞直到该线程运行完毕,而 CountDownLatch 则使用计数器允许子线程运行完毕或者运行中时候递减计数,也就是 CountDownLatch 可以在子线程运行任何时候让 await 方法返回而不一定必须等到线程结束;另外使用线程池来管理线程时候一般都是直接添加 Runable 到线程池这时候就没有办法在调用线程的 join 方法了,countDownLatch 相比 Join 方法让我们对线程同步有更灵活的控制。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
C#爬虫使用代理刷csdn文章浏览量
昨天写了一篇关于“ c#批量抓取免费代理并验证有效性”的文章,接着昨天的目标继续完成吧,最终实现的目的就是刷新csdn文章的浏览量(实际上很简单,之前博客园的文章也是可以使用代理ip来刷的,后来不行了),刷文章的浏览量本身是可耻的,没有任何意义,当然技术无罪。之前是在csdn写文章,自从csdn改版之后就主要在博客园写。 1.如何维护代理IP库? 想要使用代理IP,那就必须有一个一定数量、足够有效的代理ip库,在学习阶段,随便玩玩那就只能从免费的代理IP网站上抓取,没有一定数量的代理刷文章流浪量非常慢,首先就是要维护好自己的代理Ip库 之前用过的西刺代理、66ip比较靠谱,西祠好像有反扒,遇到过一次,不知道是西祠网站的问题还是反扒的策略,这两个网站每分钟抓取的能用的代理大概有2,3个,这已经算的上比较客观的了, data5u、快代理、ip3366网页端更新的非常少,而且有效性比较低,快代理抓取网页还必须设置Useragent,发现设置后获取的ip的端口和网页端不一致,很玩味是吧,没办法免费的就是这样,不然人家就收费了,当然付费的代理也不稳定,但肯定是比免费的好很多。 维护代理质量 从网...
- 下一篇
Python---如何实现千图成像:初级篇(从图片爬取到图片合成)
千图成像:用N张图片拼凑成一张图片。实现原理:先将所要成像的图片转化成马赛克图片,然后从图库中用对应颜色的图片替换相应色块。图库中的图片处理:标记图库中每张图片的混合颜色,用于替换目标色块,并记录每张图片的特征用于成像,增加成像质量。 0,起源 德莱文 图片局部 英雄联盟-微博 很久前在刷微博的时候看到了这条,被他给震撼到了,图片是由LOL近千张皮肤图片组合构成的(难道这是用ps做的,还是一张张拼的,应该不可能吧),就在昨天突然就想起了这个事,就决定也做一个,随即便展开了行动。搜到了这篇文章,看了下图片的构成,决定先取得所有皮肤的图片再说吧!便又开始了爬虫! 运行环境:Python3.6.5 , pycharm-2018-1-2 , win10 愣着干嘛,快往下看 1.爬虫思路 皮肤图片的来源问题,首先到官网去找了找,想到了道聚城皮肤专卖区,其中正好有所有我们需要的图片。 皮肤图片来源 通过F12锁定图片取得了第一张图片的URL,(https://game.gtimg.cn/images/daoju/app/lol/medium/2-122015-9.jpg) 取得URL 以此类推便可...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Mario游戏-低调大师作品