您现在的位置是:首页 > 文章详情

java并发编程 -CountDownLatch和CyclicBarrier在内部实现和场景上的区别

日期:2020-05-17点击:487

前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。


内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。

 public class  {      //Synchronization control For CountDownLatch.      //Uses AQS state to represent count.     private static final class Sync extends AbstractQueuedSynchronizer {         private static final long serialVersionUID = 4982264981922014374L;         Sync(int count) {             setState(count);         }         int getCount() {             return getState();         }         protected int tryAcquireShared(int acquires) {             return (getState() == 0) ? 1 : -1;         }         protected boolean tryReleaseShared(int releases) {             // Decrement count; signal when transition to zero             for (;;) {                 int c = getState();                 if (c == 0)                     return false;                 int nextc = c-1;                 if (compareAndSetState(c, nextc))                     return nextc == 0;             }         }     }     private final Sync sync;     ... ...//  }
 public class CyclicBarrier {     /**      * Each use of the barrier is represented as a generation instance.      * The generation changes whenever the barrier is tripped, or      * is reset. There can be many generations associated with threads      * using the barrier - due to the non-deterministic way the lock      * may be allocated to waiting threads - but only one of these      * can be active at a time (the one to which {@code count} applies)      * and all the rest are either broken or tripped.      * There need not be an active generation if there has been a break      * but no subsequent reset.      */     private static class Generation {         boolean broken = false;     }     /** The lock for guarding barrier entry */     private final ReentrantLock lock = new ReentrantLock();     /** Condition to wait on until tripped */     private final Condition trip = lock.newCondition();     /** The number of parties */     private final int parties;     /* The command to run when tripped */     private final Runnable barrierCommand;     /** The current generation */     private Generation generation = new Generation();     /**      * Number of parties still waiting. Counts down from parties to 0      * on each generation.  It is reset to parties on each new      * generation or when broken.      */     private int count;     /**      * Updates state on barrier trip and wakes up everyone.      * Called only while holding lock.      */     private void nextGeneration() {         // signal completion of last generation         trip.signalAll();         // set up next generation         count = parties;         generation = new Generation();     }     /**      * Sets current barrier generation as broken and wakes up everyone.      * Called only while holding lock.      */     private void breakBarrier() {         generation.broken = true;         count = parties;         trip.signalAll();     }     /**      * Main barrier code, covering the various policies.      */     private int dowait(boolean timed, long nanos)         throws InterruptedException, BrokenBarrierException,                TimeoutException {         final ReentrantLock lock = this.lock;         lock.lock();         try {             final Generation g = generation;             if (g.broken)                 throw new BrokenBarrierException();             if (Thread.interrupted()) {                 breakBarrier();                 throw new InterruptedException();             }             int index = --count;             if (index == 0) {  // tripped                 boolean ranAction = false;                 try {                     final Runnable command = barrierCommand;                     if (command != null)                         command.run();                     ranAction = true;                     nextGeneration();                     return 0;                 } finally {                     if (!ranAction)                         breakBarrier();                 }             }             // loop until tripped, broken, interrupted, or timed out             for (;;) {                 try {                     if (!timed)                         trip.await();                     else if (nanos > 0L)                         nanos = trip.awaitNanos(nanos);                 } catch (InterruptedException ie) {                     if (g == generation && ! g.broken) {                         breakBarrier();                         throw ie;                     } else {                         // We're about to finish waiting even if we had not                         // been interrupted, so this interrupt is deemed to                         // "belong" to subsequent execution.                         Thread.currentThread().interrupt();                     }                 }                 if (g.broken)                     throw new BrokenBarrierException();                 if (g != generation)                     return index;                 if (timed && nanos <= 0L) {                     breakBarrier();                     throw new TimeoutException();                 }             }         } finally {             lock.unlock();         }     }     ... ... //  }


实战 - 展示各自的使用场景


/**  *类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行  */ public class UseCountDownLatch {         static CountDownLatch latch = new CountDownLatch(6);     /*初始化线程*/     private static class InitThread implements Runnable{         public void run() {            System.out.println("Thread_"+Thread.currentThread().getId()                  +" ready init work......");             latch.countDown();             for(int i =0;i<2;i++) {                System.out.println("Thread_"+Thread.currentThread().getId()                      +" ........continue do its work");             }         }     }     /*业务线程等待latch的计数器为0完成*/     private static class BusiThread implements Runnable{         public void run() {             try {                 latch.await();             } catch (InterruptedException e) {                 e.printStackTrace();             }             for(int i =0;i<3;i++) {                System.out.println("BusiThread_"+Thread.currentThread().getId()                      +" do business-----");             }         }     }     public static void main(String[] args) throws InterruptedException {         new Thread(new Runnable() {             public void run() {                SleepTools.ms(1);                 System.out.println("Thread_"+Thread.currentThread().getId()                      +" ready init work step 1st......");                 latch.countDown();                 System.out.println("begin step 2nd.......");                 SleepTools.ms(1);                 System.out.println("Thread_"+Thread.currentThread().getId()                      +" ready init work step 2nd......");                 latch.countDown();             }         }).start();         new Thread(new BusiThread()).start();         for(int i=0;i<=3;i++){             Thread thread = new Thread(new InitThread());             thread.start();         }         latch.await();         System.out.println("Main do ites work........");     } }
/**  *类说明:共4个子线程,他们全部完成工作后,交出自己结果,  *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串  */ class UseCyclicBarrier {     private static CyclicBarrier barrier             = new CyclicBarrier(4,new CollectThread());     //存放子线程工作结果的容器     private static ConcurrentHashMap<String,Long> resultMap             = new ConcurrentHashMap<String,Long>();     public static void main(String[] args) {         for(int i=0;i<4;i++){             Thread thread = new Thread(new SubThread());             thread.start();         }     }     /*汇总的任务*/     private static class CollectThread implements Runnable{         @Override         public void run() {             StringBuilder result = new StringBuilder();             for(Map.Entry<String,Long> workResult:resultMap.entrySet()){                result.append("["+workResult.getValue()+"]");             }             System.out.println(" the result = "+ result);             System.out.println("do other business........");         }     }     /*相互等待的子线程*/     private static class SubThread implements Runnable{         @Override         public void run() {            long id = Thread.currentThread().getId();             resultMap.put(Thread.currentThread().getId()+"",id);             try {                    Thread.sleep(1000+id);                    System.out.println("Thread_"+id+" ....do something ");                 barrier.await();                Thread.sleep(1000+id);                 System.out.println("Thread_"+id+" ....do its business ");                 barrier.await();             } catch (Exception e) {                 e.printStackTrace();             }         }     } }


 两者总结

1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

6.     就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;




原文链接:https://blog.51cto.com/14815984/2496009
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章