java源码 - CyclicBarrier
开篇
CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
CyclicBarrier的内部实现逻辑基于ReentrantLock实现,可以理解为ReentrantLock的上层应用者,通过ReentrantLock的Condtion实现线程的休眠和唤醒。
CyclicBarrier用法demo
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
CyclicBarrier类定义
- parties记录一共等待执行个数,count记录依然等待执行的个数。
- barrierCommand记录所有待执行的完成后由最后一个线程执行的完成的命令。
- Generation的代的概念来实现CyclicBarrier的复用。
- 构造函数负责初始化parties、count、barrierCommand的核心变量。
public class CyclicBarrier {
// 代的类定义
private static class Generation {
boolean broken = false;
}
// 内部通过ReentrantLock实现线程安全的等待
private final ReentrantLock lock = new ReentrantLock();
// 内部通过Lock的condition实现所有waiter的信号通知
private final Condition trip = lock.newCondition();
// 所有等待执行的个数
private final int parties;
// 所有等待线程都完成任务后由最后一个线程执行的命令
private final Runnable barrierCommand;
// 通过代的概念实现复用
private Generation generation = new Generation();
// 还在等待的个数
private int count;
// 核心构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier工作原理
- CyclicBarrier通过ReentrantLock来保证线程休眠和唤醒的通信。
- 在执行过程中会对等待计数进行减一操作,值不为0当前线程进入休眠等待其他线程唤醒
- 在执行过程中会对等待计数进行减一操作,值为0当前线程直接执行barrierCommand并且通过nextGeneration方法唤醒其他等待线程
- 线程的休眠和唤醒都是基于ReentrantLock来实现的
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 通过lock来保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 判断generation过期的情况
if (g.broken)
throw new BrokenBarrierException();
// 判断线程中断情况
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 递减待执行的个数计数
int index = --count;
// 所有待执行任务完成后执行barrierCommand命令
if (index == 0) { // tripped
boolean ranAction = false;
try {
// barrierCommand命令不为null的时候执行该命令
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 已经执行了barrierCommand
ranAction = true;
// 重置generation用以复用并且唤醒所有等待的线程
// private void nextGeneration() {
// trip.signalAll();
// count = parties;
// generation = new Generation();
// }
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 如果count的值不为0,那么当前线程就开始进入等待
// 外层通过lock占用锁,内层通过wait()进入休眠并释放锁
for (;;) {
try {
if (!timed)
// private final Condition trip = lock.newCondition();
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 各种后置处理逻辑
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
参考文章

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Node.js学习
《了不起的Node.js:将JavaScript进行到底》(电子工业出版社) 2009年ryan在JavaScript开发者大会宣布了一个名为node.js的新技术,运行在服务器端的JavaScript,“以后开发web应用就只需要一种语言了!!!” node.js快速高效的优点得益于事件轮询技术(event loop),以及google为chrome浏览器设计的V8(JavaScript解释器和虚拟机)。 node.js自带了很多有用的模块和一个名为NPM的简单包管理器(node package manager) Node.js的安装 官网地址:https://nodejs.org/en/ 找到对应的系统版本的安装包,下载,我的是win-64位系统 一路下一步就可 验证是否安装成功: 打开cmd,定位到nodejs的安装目录 查看node安装版本,输入:node -v 查看npm安装版本,输入:npm -v 显示版本号,就说明安装了。 运行node repl win+r打开运行框,输入node, 可以运行一些JavaScript的表达式:Object.keys(global) ...
-
下一篇
Python爬取猫眼「碟中谍」全部评论
实现目标 昨天晚上看完碟中谍后,有点小激动,然后就有了这片文章。 我们将猫眼上碟中谍的全部评论保存下来,用于后期分析~ 总共评论3W条左右。 逻辑梳理 猫眼PC网页只能查看热门评论,只有在手机端页面才能查看全部评论。我们用chrome手机模式打开碟中谍6的页面,然后找到了全部评论入口: 当我们将评论页面向上拖,后台请求中变看到了我们想要的接口地址:http://m.maoyan.com/mmdb/comments/movie/341737.json?_v_=yes&offset=15&startTime=2018-09-02%2013%3A33%3A14 请求地址中的参数: offset:偏移量 startTime:查询起始时间 341737:电影ID 还有一个V不知道啥意思,不过没啥影响 其实正常来说到这儿就差不多了,按照以往的套路循环传入offset参数就好了,不过当我爬到第67页的时候,就已经不返回值了,为啥是67,6715=1005,猫眼应该是控制了每个startTime只能往前取1000条评论,所以只能换个思路,将每页最早一条评论的时间作为startTime传...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS8编译安装MySQL8.0.19
- MySQL数据库在高并发下的优化方案
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- CentOS7,8上快速安装Gitea,搭建Git服务器