java源码 - CyclicBarrier
开篇
CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
CyclicBarrier的内部实现逻辑基于ReentrantLock实现,可以理解为ReentrantLock的上层应用者,通过ReentrantLock的Condtion实现线程的休眠和唤醒。
CyclicBarrier用法demo
public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } } }
线程Thread-0正在写入数据... 线程Thread-3正在写入数据... 线程Thread-2正在写入数据... 线程Thread-1正在写入数据... 线程Thread-2写入数据完毕,等待其他线程写入完毕 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务...
CyclicBarrier类定义
- parties记录一共等待执行个数,count记录依然等待执行的个数。
- barrierCommand记录所有待执行的完成后由最后一个线程执行的完成的命令。
- Generation的代的概念来实现CyclicBarrier的复用。
- 构造函数负责初始化parties、count、barrierCommand的核心变量。
public class CyclicBarrier { // 代的类定义 private static class Generation { boolean broken = false; } // 内部通过ReentrantLock实现线程安全的等待 private final ReentrantLock lock = new ReentrantLock(); // 内部通过Lock的condition实现所有waiter的信号通知 private final Condition trip = lock.newCondition(); // 所有等待执行的个数 private final int parties; // 所有等待线程都完成任务后由最后一个线程执行的命令 private final Runnable barrierCommand; // 通过代的概念实现复用 private Generation generation = new Generation(); // 还在等待的个数 private int count; // 核心构造函数 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
CyclicBarrier工作原理
- CyclicBarrier通过ReentrantLock来保证线程休眠和唤醒的通信。
- 在执行过程中会对等待计数进行减一操作,值不为0当前线程进入休眠等待其他线程唤醒
- 在执行过程中会对等待计数进行减一操作,值为0当前线程直接执行barrierCommand并且通过nextGeneration方法唤醒其他等待线程
- 线程的休眠和唤醒都是基于ReentrantLock来实现的
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 通过lock来保证线程安全 final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; // 判断generation过期的情况 if (g.broken) throw new BrokenBarrierException(); // 判断线程中断情况 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 递减待执行的个数计数 int index = --count; // 所有待执行任务完成后执行barrierCommand命令 if (index == 0) { // tripped boolean ranAction = false; try { // barrierCommand命令不为null的时候执行该命令 final Runnable command = barrierCommand; if (command != null) command.run(); // 已经执行了barrierCommand ranAction = true; // 重置generation用以复用并且唤醒所有等待的线程 // private void nextGeneration() { // trip.signalAll(); // count = parties; // generation = new Generation(); // } nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 如果count的值不为0,那么当前线程就开始进入等待 // 外层通过lock占用锁,内层通过wait()进入休眠并释放锁 for (;;) { try { if (!timed) // private final Condition trip = lock.newCondition(); trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 各种后置处理逻辑 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
参考文章

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Node.js学习
《了不起的Node.js:将JavaScript进行到底》(电子工业出版社) 2009年ryan在JavaScript开发者大会宣布了一个名为node.js的新技术,运行在服务器端的JavaScript,“以后开发web应用就只需要一种语言了!!!” node.js快速高效的优点得益于事件轮询技术(event loop),以及google为chrome浏览器设计的V8(JavaScript解释器和虚拟机)。 node.js自带了很多有用的模块和一个名为NPM的简单包管理器(node package manager) Node.js的安装 官网地址:https://nodejs.org/en/ 找到对应的系统版本的安装包,下载,我的是win-64位系统 一路下一步就可 验证是否安装成功: 打开cmd,定位到nodejs的安装目录 查看node安装版本,输入:node -v 查看npm安装版本,输入:npm -v 显示版本号,就说明安装了。 运行node repl win+r打开运行框,输入node, 可以运行一些JavaScript的表达式:Object.keys(global) ...
- 下一篇
Python爬取猫眼「碟中谍」全部评论
实现目标 昨天晚上看完碟中谍后,有点小激动,然后就有了这片文章。 我们将猫眼上碟中谍的全部评论保存下来,用于后期分析~ 总共评论3W条左右。 逻辑梳理 猫眼PC网页只能查看热门评论,只有在手机端页面才能查看全部评论。我们用chrome手机模式打开碟中谍6的页面,然后找到了全部评论入口: 当我们将评论页面向上拖,后台请求中变看到了我们想要的接口地址:http://m.maoyan.com/mmdb/comments/movie/341737.json?_v_=yes&offset=15&startTime=2018-09-02%2013%3A33%3A14 请求地址中的参数: offset:偏移量 startTime:查询起始时间 341737:电影ID 还有一个V不知道啥意思,不过没啥影响 其实正常来说到这儿就差不多了,按照以往的套路循环传入offset参数就好了,不过当我爬到第67页的时候,就已经不返回值了,为啥是67,6715=1005,猫眼应该是控制了每个startTime只能往前取1000条评论,所以只能换个思路,将每页最早一条评论的时间作为startTime传...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Mario游戏-低调大师作品
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS关闭SELinux安全模块