您现在的位置是:首页 > 文章详情

java源码 - CyclicBarrier

日期:2018-09-01点击:494

开篇

  • CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。

  • CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

  • CyclicBarrier的内部实现逻辑基于ReentrantLock实现,可以理解为ReentrantLock的上层应用者,通过ReentrantLock的Condtion实现线程的休眠和唤醒。


CyclicBarrier用法demo

public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } } } 
线程Thread-0正在写入数据... 线程Thread-3正在写入数据... 线程Thread-2正在写入数据... 线程Thread-1正在写入数据... 线程Thread-2写入数据完毕,等待其他线程写入完毕 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 


CyclicBarrier类定义

  • parties记录一共等待执行个数,count记录依然等待执行的个数。
  • barrierCommand记录所有待执行的完成后由最后一个线程执行的完成的命令。
  • Generation的代的概念来实现CyclicBarrier的复用。
  • 构造函数负责初始化parties、count、barrierCommand的核心变量。
public class CyclicBarrier { // 代的类定义 private static class Generation { boolean broken = false; } // 内部通过ReentrantLock实现线程安全的等待 private final ReentrantLock lock = new ReentrantLock(); // 内部通过Lock的condition实现所有waiter的信号通知 private final Condition trip = lock.newCondition(); // 所有等待执行的个数 private final int parties; // 所有等待线程都完成任务后由最后一个线程执行的命令 private final Runnable barrierCommand; // 通过代的概念实现复用 private Generation generation = new Generation(); // 还在等待的个数 private int count; // 核心构造函数 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } 


CyclicBarrier工作原理

  • CyclicBarrier通过ReentrantLock来保证线程休眠和唤醒的通信。
  • 在执行过程中会对等待计数进行减一操作,值不为0当前线程进入休眠等待其他线程唤醒
  • 在执行过程中会对等待计数进行减一操作,值为0当前线程直接执行barrierCommand并且通过nextGeneration方法唤醒其他等待线程
  • 线程的休眠和唤醒都是基于ReentrantLock来实现的
 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 通过lock来保证线程安全 final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; // 判断generation过期的情况 if (g.broken) throw new BrokenBarrierException(); // 判断线程中断情况 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 递减待执行的个数计数 int index = --count; // 所有待执行任务完成后执行barrierCommand命令 if (index == 0) { // tripped boolean ranAction = false; try { // barrierCommand命令不为null的时候执行该命令 final Runnable command = barrierCommand; if (command != null) command.run(); // 已经执行了barrierCommand ranAction = true; // 重置generation用以复用并且唤醒所有等待的线程 // private void nextGeneration() { // trip.signalAll(); // count = parties; // generation = new Generation(); // } nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 如果count的值不为0,那么当前线程就开始进入等待 // 外层通过lock占用锁,内层通过wait()进入休眠并释放锁 for (;;) { try { if (!timed) // private final Condition trip = lock.newCondition(); trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 各种后置处理逻辑 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } 


参考文章

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

原文链接:https://yq.aliyun.com/articles/666306
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章