多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟?
1、学习切入点
百度翻译大概意思就是:
一种同步辅助程序,允许一组线程相互等待到达一个公共的屏障点。CyclicBarrier在涉及固定大小的线程方的程序中非常有用,这些线程方有时必须相互等待。这个屏障被称为循环屏障,因为它可以在等待的线程被释放后重新使用。
CyclicBarrier支持可选的Runnable命令,该命令在参与方中的最后一个线程到达后,但在释放任何线程之前,每个屏障点运行一次。此屏障操作有助于在任何参与方继续之前更新共享状态。
动图演示:
在上文中我们分析完了 CountDownLatch源码,可以理解为减法计数器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 来说,要简单很多,它类似于加法计数器,在源码中使用 ReentrantLock 和 Condition 的组合来使用。
2、案例演示 CyclicBarrier
//加法计数器 public class CyclicBarrierDemo { public static void main(String[] args) { /** * 集齐5名队员,开始游戏 */ // 开始战斗的线程 CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{ System.out.println("欢迎来到王者荣耀,敌军还有五秒到达战场!全军出击!"); }); for (int i = 1; i <=5 ; i++) { final int temp = i; // lambda能操作到 i 吗 new Thread(()->{ System.out.println(Thread.currentThread().getName()+"第"+temp+"个进入游戏!"); try { cyclicBarrier.await(); // 等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
3、入手构造器
//构造器1 /** 创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,并在触发屏障时执行给定的屏障操作,由最后一个进入屏障的线程执行 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //构造器2 /** 创建一个新的CyclicBarrier,当给定数量的参与方(线程)在等待它时,它将跳闸,并且在屏障跳闸时不执行预定义的操作 */ public CyclicBarrier(int parties) { this(parties, null); }
其中构造器1为核心构造器,在这里你可以指定 parties 本局游戏的参与者的数量(要拦截的线程数)以及 barrierAction 本局游戏结束时要执行的任务。
3、入手成员变量
/** 同步操作锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 线程拦截器 Condition维护了一个阻塞队列*/ private final Condition trip = lock.newCondition(); /** 每次拦截的线程数 */ private final int parties; /* 换代前执行的任务 */ private final Runnable barrierCommand; /** 表示栅栏的当前代 类似代表本局游戏*/ private Generation generation = new Generation(); /** 计数器 */ private int count; /** 静态内部类Generation */ private static class Generation { boolean broken = false; }
3、入手核心方法
3.1、【await】方法源码分析
下面分析这两个方法,分别为【非定时等待】和【定时等待】!
//非定时等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //定时等待 public int await(long timeout, TimeUnit unit)throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
可以看到,最终两个方法都走【dowait】 方法,只不过参数不同。下面我们重点看看这个方法到底做了哪些事情。
//核心等待方法 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//加锁操作 try { final Generation g = generation; //检查当前栅栏是否被打翻 if (g.broken) throw new BrokenBarrierException(); //检查当前线程是否被中断 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每次都将计数器的值-1 int index = --count; //计数器的值减为0,则需要唤醒所有线程并转换到下一代 if (index == 0) { // tripped boolean ranAction = false; try { //唤醒所有线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //唤醒所有线程并转换到下一代 nextGeneration(); return 0; } finally { //确保在任务未成功执行时能将所有线程唤醒 if (!ranAction) breakBarrier(); } } //如果计数器不为0 则执行此循环 // loop until tripped, broken, interrupted, or timed out for (;;) { try { //根据传入的参数来觉得是定时等待还是非定时等待 if (!timed) //如果没有时间限制,则直接等待,直到被唤醒 trip.await(); else if (nanos > 0L) //如果有时间限制,则等待指定时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //若当前线程在等待期间被中断则打翻栅栏唤醒其它线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 若在捕获中断异常前已经完成在栅栏上的等待,则直接调用中断操作 Thread.currentThread().interrupt(); } } //如果线程因为打翻栅栏操作而被唤醒则抛出异常 if (g.broken) throw new BrokenBarrierException(); //如果线程因为换代操作而被唤醒则返回计数器的值 if (g != generation) return index; //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock();//最终解锁 } }
分两步分析,首先计数器的值减为0的情况,和计数器不为0的情况,首先第一种情况下:
第二种情况,计数器不为0,则进入自旋for(;;):
多线程同时并发访问,如何阻塞当前线程?
我们翻看源码,这里就看一下没有时间限制的【trip.await】方法:
整个await的过程:
1、将当前线程加入到Condition锁队列中。特别主要要区分AQS的等待队列,这里进入的是Condition的FIFO队列
2、释放锁。这里可以看到【fullyRelease】将锁释放了,否则【acquireQueued(node, savedState)】别的线程就无法拿到锁而发生死锁。
3、自旋(while)挂起,直到被唤醒或者超时或者CACELLED等。
4、获取锁【acquireQueued】方法,并将自己从Condition的FIFO队列中释放,表面自己不再需要锁(我已经有锁了)
3.2、Condition 队列与AQS等待队列 补充
AQS等待队列与Condition队列是两个相互独立的队列,【await】就是在当前线程持有锁的基础上释放锁资源,并新建Condition节点加入到Condition队列尾部,阻塞当前线程。【signal】就是将当前Condition的头结点移动到AQS等待队列节点尾部,让其等待再次获取锁。下面画图演示区别:
节点1执行Condition.await()->(1)将head后移 ->(2)释放节点1的锁并从AQS等待队列中移除->(3)将节点1加入到Condition的等待队列中->(4)更新lastWrite为节点1
节点2执行signal()操作->(1)将firstWrite后移->(2)将节点4移出Condition队列->(3)将节点4加入到AQS的等待队列中去->(4)更新AQS等待队列的tail
3.3、总结:
一、Condition的数据结构:
我们知道一个Condition可以在多个地方被await(),那么就需要一个FIFO的结构将这些Condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition内部就需要一个FIFO的队列。
private transient Node firstWaiter;
private transient Node lastWaiter;
上面的两个节点就是描述一个FIFO的队列。我们再结合前面提到的节点(Node)数据结构。我们就发现Node.nextWaiter就派上用场了!nextWaiter就是将一系列的Condition.await 串联起来组成一个FIFO的队列。
二、线程何时阻塞和释放
阻塞:await()方法中,在线程释放锁资源之后,如果节点不在AQS等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁
释放:signal()后,节点会从condition队列移动到AQS等待队列,则进入正常锁的获取流程。
3.4、【signalAll】signalAll源码分析
【signalAll】方法,唤醒所有在Condition阻塞队列中的线程
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll();//唤醒Condition中等待的线程 } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** 这个方法相当于把Condition队列中的所有Node全部取出插入到等待队列中去 */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
/** 将节点从条件队列传输到同步队列AQS的等待队列中 */ final boolean transferForSignal(Node node) { //核心添加节点到AQS队列方法 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
/** 使用CAS+自旋方式插入节点到等待队列,如果队列为空,则初始化队列 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
3.5、【reset】方法源码分析
最后,我们来看看怎么重置一个栅栏:
将屏障重置为初始状态。如果任何一方目前在隔离墙等候,他们将带着BrokenBarrierException返回。 请注意,由于其他原因发生中断后的重置可能很复杂;线程需要以其他方式重新同步,并选择一种方式执行重置。 最好是创建一个新的屏障供以后使用
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
测试reset代码:
首先,打破栅栏,那意味着所有等待的线程(5个等待的线程)会唤醒,【await 】方法会通过抛出【BrokenBarrierException】异常返回。然后开启新一代,重置了 count 和 generation,相当于一切归0了。
4、CyclicBarrier 与 CountDownLatch 的区别
相同点:
1、都可以实现一组线程在到达某个条件之前进行等待
2、它们内部都有一个计数器,当计数器的值不断减为0的时候,所有阻塞的线程都会被唤醒!
不同点:
1、CyclicBarrier 的计数器是由它自己来控制,而CountDownLatch 的计数器则是由使用则来控制
2、在CyclicBarrier 中线程调用 await方法不仅会将自己阻塞,还会将计数器减1,而在CountDownLatch中线程调用 await方法只是将自己阻塞而不会减少计数器的值。
3、另外,CountDownLatch 只能拦截一轮,而CyclicBarrier 可以实现循环拦截。一般来说CyclicBarrier 可以实现 CountDownLatch的功能,而反之不能。
5、总结:
当调用【cyclicBarrier.await】方法时,最终都会执行【dowait】方法,使用了ReentrantLock去上锁,每次讲计数器count值-1,当计数器值-1为0的时候,会先执行指定任务,调用Condition的【trip.signalAll()】唤醒所有线程并进入下一代
如果当前计数器值-1不为0的时候,进入自旋,执行Condition的【await()】方法,将当前线程添加到Condition的条件队列中等待,执行【fullyRelease】调用【tryRelease】将count值-1,再判断count值是否为0,为0 则会先执行指定任务,调用Condition的【trip.signalAll()】唤醒所有线程并进入下一代,再判断是否在AQS等待队列中,如果不在的话就park当前线程进入AQS等待队列中,否则自旋直到被唤醒在Condition中的等待队列被signalAll进入AQS等待队列中获取锁
推荐阅读:
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
多线程进阶——JUC并发编程之CountDownLatch源码一探究竟?
1、学习切入点 JDK的并发包中提供了几个非常有用的并发工具类。 CountDownLatch、 CyclicBarrier和 Semaphore工具类提供了一种并发流程控制的手段。本文将介绍CountDownLatch(闭锁)的实现原理。在了解闭锁之前需要先了解AQS,因为CountDownLatch的实现需要依赖于AQS共享锁的实现机制。 官方文档: https://docs.oracle.com/javase/8/docs/api/ 百度翻译如下: 一种同步辅助程序,允许一个或多个线程等待在其它线程中执行的一组操作完成。使用给定的计数初始化CountDownLatch。由于调用了countDown()方法,await方法阻塞直到当前计数为零,之后释放所有等待线程,并立即返回await的任何后续调用。这是一个一次性现象——计数不能重置。如果需要重置计数的版本,请考虑使用CyclicBarrier。倒计时锁存器是一种通用的同步工具,可用于多种目的。使用计数1初始化的倒计时锁存器用作简单的开/关锁存器或门:调用倒计时()的线程打开它之前,调用它的所有线程都在门处等待。初始化为N的倒计时...
- 下一篇
多线程进阶——JUC并发编程之Semaphore源码一探究竟?
1、学习切入点 百度翻译如下: 计数信号量。从概念上讲,信号量维护一组许可。如果需要,每个acquire()都会阻塞,直到有许可证可用,然后获取它。每个release()添加一个许可,可能释放一个阻塞的收单机构。但是,并没有使用实际的许可对象;信号量只是保持一个可用数量的计数,并相应地进行操作。 2、案例引入 public class SemaphoreDemo { public static void main(String[] args) { // 线程数量:停车位! 限流! Semaphore semaphore = new Semaphore(3); for (int i = 1; i <=6 ; i++) { new Thread(()->{ // acquire() 得到 try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Th...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7安装Docker,走上虚拟化容器引擎之路