首页 文章 精选 留言 我的

精选列表

搜索[高并发],共10000篇文章
优秀的个人博客,低调大师

多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟

1、学习切入点 百度翻译大概意思就是: 一种同步辅助程序,允许一组线程相互等待到达一个公共的屏障点。CyclicBarrier在涉及固定大小的线程方的程序中非常有用,这些线程方有时必须相互等待。这个屏障被称为循环屏障,因为它可以在等待的线程被释放后重新使用。 CyclicBarrier支持可选的Runnable命令,该命令在参与方中的最后一个线程到达后,但在释放任何线程之前,每个屏障点运行一次。此屏障操作有助于在任何参与方继续之前更新共享状态。 动图演示: 在上文中我们分析完了CountDownLatch源码,可以理解为减法计数器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 来说,要简单很多,它类似于加法计数器,在源码中使用 ReentrantLock 和 Condition 的组合来使用。 2、案例演示CyclicBarrier //加法计数器 public class CyclicBarrierDemo { public static void main(String[] args) { /** * 集齐5名队员,开始游戏 */ // 开始战斗的线程 CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{ System.out.println("欢迎来到王者荣耀,敌军还有五秒到达战场!全军出击!"); }); for (int i = 1; i <=5 ; i++) { final int temp = i; // lambda能操作到 i 吗 new Thread(()->{ System.out.println(Thread.currentThread().getName()+"第"+temp+"个进入游戏!"); try { cyclicBarrier.await(); // 等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } } 3、入手构造器 //构造器1 /** 创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,并在触发屏障时执行给定的屏障操作,由最后一个进入屏障的线程执行 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //构造器2 /** 创建一个新的CyclicBarrier,当给定数量的参与方(线程)在等待它时,它将跳闸,并且在屏障跳闸时不执行预定义的操作 */ public CyclicBarrier(int parties) { this(parties, null); } 其中构造器1为核心构造器,在这里你可以指定 parties 本局游戏的参与者的数量(要拦截的线程数)以及barrierAction本局游戏结束时要执行的任务。 3.1、入手成员变量 /** 同步操作锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 线程拦截器 */ private final Condition trip = lock.newCondition(); /** 每次拦截的线程数 */ private final int parties; /* 换代前执行的任务 */ private final Runnable barrierCommand; /** 表示栅栏的当前代 类似代表本局游戏*/ private Generation generation = new Generation(); /** 计数器 */ private int count; /** 静态内部类Generation */ private static class Generation { boolean broken = false; } 3.2、入手核心方法 下面分析这两个方法,分别为【非定时等待】和【定时等待】! //非定时等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //定时等待 public int await(long timeout, TimeUnit unit)throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } 可以看到,最终两个方法都走【dowait】 方法,只不过参数不同。下面我们重点看看这个方法到底做了哪些事情。 //核心等待方法 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//加锁操作 try { final Generation g = generation; //检查当前栅栏是否被打翻 if (g.broken) throw new BrokenBarrierException(); //检查当前线程是否被中断 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每次都将计数器的值-1 int index = --count; //计数器的值减为0,则需要唤醒所有线程并转换到下一代 if (index == 0) { // tripped boolean ranAction = false; try { //唤醒所有线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //唤醒所有线程并转换到下一代 nextGeneration(); return 0; } finally { //确保在任务未成功执行时能将所有线程唤醒 if (!ranAction) breakBarrier(); } } //如果计数器不为0 则执行此循环 // loop until tripped, broken, interrupted, or timed out for (;;) { try { //根据传入的参数来觉得是定时等待还是非定时等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //若当前线程在等待期间被中断则打翻栅栏唤醒其它线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 若在捕获中断异常前已经完成在栅栏上的等待,则直接调用中断操作 Thread.currentThread().interrupt(); } } //如果线程因为打翻栅栏操作而被唤醒则抛出异常 if (g.broken) throw new BrokenBarrierException(); //如果线程因为换代操作而被唤醒则返回计数器的值 if (g != generation) return index; //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock();//最终解锁 } } 分两步分析,首先计数器的值减为0的情况,和计数器不为0的情况,首先第一种情况下: 第二种情况,计数器不为0,则进入自旋for(;;): 最后,我们来看看怎么重置一个栅栏: 将屏障重置为初始状态。如果任何一方目前在隔离墙等候,他们将带着BrokenBarrierException返回。 请注意,由于其他原因发生中断后的重置可能很复杂;线程需要以其他方式重新同步,并选择一种方式执行重置。 最好是创建一个新的屏障供以后使用 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } 测试reset代码: 首先,打破栅栏,那意味着所有等待的线程(5个等待的线程)会唤醒,【await 】方法会通过抛出【BrokenBarrierException】异常返回。然后开启新一代,重置了 count 和 generation,相当于一切归0了。 4、CyclicBarrier 与 CountDownLatch 的区别 相同点: 1、都可以实现一组线程在到达某个条件之前进行等待 2、它们内部都有一个计数器,当计数器的值不断减为0的时候,所有阻塞的线程都会被唤醒! 不同点: 1、CyclicBarrier 的计数器是由它自己来控制,而CountDownLatch 的计数器则是由使用则来控制 2、在CyclicBarrier 中线程调用 await方法不仅会将自己阻塞,还会将计数器减1,而在CountDownLatch中线程调用 await方法只是将自己阻塞而不会减少计数器的值。 3、另外,CountDownLatch 只能拦截一轮,而CyclicBarrier 可以实现循环拦截。一般来说CyclicBarrier 可以实现 CountDownLatch的功能,而反之不能。

优秀的个人博客,低调大师

Java并发编程的艺术(九)——批量获取多条线程的执行结果

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_34173549/article/details/79612353 当向线程池提交callable任务后,我们可能需要一次性获取所有返回结果,有三种处理方法。 方法一:自己维护返回结果 // 创建一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); // 存储执行结果的List List<Future<String>> results = new ArrayList<Future<String>>(); // 提交10个任务 for ( int i=0; i<10; i++ ) { Future<String> result = executorService.submit( new Callable<String>(){ public String call(){ int sleepTime = new Random().nextInt(1000); Thread.sleep(sleepTime); return "线程"+i+"睡了"+sleepTime+"秒"; } } ); // 将执行结果存入results中 results.add( result ); } // 获取10个任务的返回结果 for ( int i=0; i<10; i++ ) { // 获取包含返回结果的future对象 Future<String> future = results.get(i); // 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止) String result = future.get(); System.out.println(result); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 此方法的弊端: 需要自己创建容器维护所有的返回结果,比较麻烦; 从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。 方法二:使用ExecutorService的invokeAll函数 本方法能解决第一个弊端,即并不需要自己去维护一个存储返回结果的容器。当我们需要获取线程池所有的返回结果时,只需调用invokeAll函数即可。但是,这种方式需要你自己去维护一个用于存储任务的容器。 // 创建一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); // 创建存储任务的容器 List<Callable<String>> tasks = new ArrayList<Callable<String>>(); // 提交10个任务 for ( int i=0; i<10; i++ ) { Callable<String> task = new Callable<String>(){ public String call(){ int sleepTime = new Random().nextInt(1000); Thread.sleep(sleepTime); return "线程"+i+"睡了"+sleepTime+"秒"; } }; executorService.submit( task ); // 将task添加进任务队列 tasks.add( task ); } // 获取10个任务的返回结果 List<Future<String>> results = executorService.invokeAll( tasks ); // 输出结果 for ( int i=0; i<10; i++ ) { // 获取包含返回结果的future对象 Future<String> future = results.get(i); // 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止) String result = future.get(); System.out.println(result); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 方法三:使用CompletionService CompletionService内部维护了一个阻塞队列,只有执行完成的任务结果才会被放入该队列,这样就确保执行时间较短的任务率先被存入阻塞队列中。 ExecutorService exec = Executors.newFixedThreadPool(10); final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>( 10); //实例化CompletionService final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( exec, queue); // 提交10个任务 for ( int i=0; i<10; i++ ) { executorService.submit( new Callable<String>(){ public String call(){ int sleepTime = new Random().nextInt(1000); Thread.sleep(sleepTime); return "线程"+i+"睡了"+sleepTime+"秒"; } } ); } // 输出结果 for ( int i=0; i<10; i++ ) { // 获取包含返回结果的future对象(若整个阻塞队列中还没有一条线程返回结果,那么调用take将会被阻塞,当然你可以调用poll,不会被阻塞,若没有结果会返回null,poll和take返回正确的结果后会将该结果从队列中删除) Future<String> future = completionService.take(); // 从future中取出执行结果,这里存储的future已经拥有执行结果,get不会被阻塞 String result = future.get(); System.out.println(result); }

优秀的个人博客,低调大师

java面试-Java并发编程(九)——批量获取多条线程的执行结果

当向线程池提交callable任务后,我们可能需要一次性获取所有返回结果,有三种处理方法。 方法一:自己维护返回结果 // 创建一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); // 存储执行结果的List List<Future<String>> results = new ArrayList<Future<String>>(); // 提交10个任务 for ( int i=0; i<10; i++ ) { Future<String> result = executorService.submit( new Callable<String>(){ public String call(){ int sleepTime = new Random().nextInt(1000); Thread.sleep(sleepTime); return "线程"+i+"睡了"+sleepTime+"秒"; } } ); // 将执行结果存入results中 results.add( result ); } // 获取10个任务的返回结果 for ( int i=0; i<10; i++ ) { // 获取包含返回结果的future对象 Future<String> future = results.get(i); // 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止) String result = future.get(); System.out.println(result); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 此方法的弊端: 需要自己创建容器维护所有的返回结果,比较麻烦; 从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。 方法二:使用ExecutorService的invokeAll函数 本方法能解决第一个弊端,即并不需要自己去维护一个存储返回结果的容器。当我们需要获取线程池所有的返回结果时,只需调用invokeAll函数即可。但是,这种方式需要你自己去维护一个用于存储任务的容器。 // 创建一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); // 创建存储任务的容器 List<Callable<String>> tasks = new ArrayList<Callable<String>>(); // 提交10个任务 for ( int i=0; i<10; i++ ) { Callable<String> task = new Callable<String>(){ public String call(){ int sleepTime = new Random().nextInt(1000); Thread.sleep(sleepTime); return "线程"+i+"睡了"+sleepTime+"秒"; } }; executorService.submit( task ); // 将task添加进任务队列 tasks.add( task ); } // 获取10个任务的返回结果 List<Future<String>> results = executorService.invokeAll( tasks ); // 输出结果 for ( int i=0; i<10; i++ ) { // 获取包含返回结果的future对象 Future<String> future = results.get(i); // 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止) String result = future.get(); System.out.println(result); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 方法三:使用CompletionService CompletionService内部维护了一个阻塞队列,只有执行完成的任务结果才会被放入该队列,这样就确保执行时间较短的任务率先被存入阻塞队列中。 ExecutorService exec = Executors.newFixedThreadPool(10); final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>( 10); //实例化CompletionService final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( exec, queue); // 提交10个任务 for ( int i=0; i<10; i++ ) { executorService.submit( new Callable<String>(){ public String call(){ int sleepTime = new Random().nextInt(1000); Thread.sleep(sleepTime); return "线程"+i+"睡了"+sleepTime+"秒"; } } ); } // 输出结果 for ( int i=0; i<10; i++ ) { // 获取包含返回结果的future对象(若整个阻塞队列中还没有一条线程返回结果,那么调用take将会被阻塞,当然你可以调用poll,不会被阻塞,若没有结果会返回null,poll和take返回正确的结果后会将该结果从队列中删除) Future<String> future = completionService.take(); // 从future中取出执行结果,这里存储的future已经拥有执行结果,get不会被阻塞 String result = future.get(); System.out.println(result); }

优秀的个人博客,低调大师

Java并发编程的艺术(八)——闭锁、同步屏障、信号量详解

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_34173549/article/details/79612374 1. 闭锁:CountDownLatch 1.1 使用场景 若有多条线程,其中一条线程需要等到其他所有线程准备完所需的资源后才能运行,这样的情况可以使用闭锁。 1.2 代码实现 // 初始化闭锁,并设置资源个数 CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread( new Runnable(){ public void run(){ // 加载资源1 加载资源的代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t2 = new Thread( new Runnable(){ public void run(){ // 加载资源2 资源加载代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t3 = new Thread( new Runnable(){ public void run(){ // 本线程必须等待所有资源加载完后才能执行 latch.await(); // 当闭锁数量为0时,await返回,执行接下来的任务 任务代码…… } } ).start(); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 2. 同步屏障:CyclicBarrier 2.1 使用场景 若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。 2.2 闭锁 与 同步屏障 的区别 闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行(实际上并不会同时执行,而是尽量把线程启动的时间间隔降为最少)。 2.3 代码实现 // 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务 CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){ public void run(){ //当所有线程准备完毕后触发此任务 } }); // 启动三条线程 for( int i=0; i<3; i++ ){ new Thread( new Runnable(){ public void run(){ // 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障) barrier.await(); // 任务 任务代码…… } } ).start(); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 3. 信号量:Semaphore 3.1 使用场景 若有m个资源,但有n条线程(n>m),因此同一时刻只能允许m条线程访问资源,此时可以使用Semaphore控制访问该资源的线程数量。 3.2 代码实现 // 创建信号量对象,并给予3个资源 Semaphore semaphore = new Semaphore(3); // 开启10条线程 for ( int i=0; i<10; i++ ) { new Thread( new Runnbale(){ public void run(){ // 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源 semaphore.acquire(); // 任务代码 …… // 释放资源 semaphore.release(); } } ).start(); }

优秀的个人博客,低调大师

java面试-Java并发编程(八)——闭锁、同步屏障、信号量详解

1. 闭锁:CountDownLatch 1.1 使用场景 若有多条线程,其中一条线程需要等到其他所有线程准备完所需的资源后才能运行,这样的情况可以使用闭锁。 1.2 代码实现 // 初始化闭锁,并设置资源个数 CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread( new Runnable(){ public void run(){ // 加载资源1 加载资源的代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t2 = new Thread( new Runnable(){ public void run(){ // 加载资源2 资源加载代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t3 = new Thread( new Runnable(){ public void run(){ // 本线程必须等待所有资源加载完后才能执行 latch.await(); // 当闭锁数量为0时,await返回,执行接下来的任务 任务代码…… } } ).start(); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 2. 同步屏障:CyclicBarrier 2.1 使用场景 若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。 2.2 闭锁 与 同步屏障 的区别 闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行(实际上并不会同时执行,而是尽量把线程启动的时间间隔降为最少)。 2.3 代码实现 // 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务 CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){ public void run(){ //当所有线程准备完毕后触发此任务 } }); // 启动三条线程 for( int i=0; i<3; i++ ){ new Thread( new Runnable(){ public void run(){ // 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障) barrier.await(); // 任务 任务代码…… } } ).start(); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 3. 信号量:Semaphore 3.1 使用场景 若有m个资源,但有n条线程(n>m),因此同一时刻只能允许m条线程访问资源,此时可以使用Semaphore控制访问该资源的线程数量。 3.2 代码实现 // 创建信号量对象,并给予3个资源 Semaphore semaphore = new Semaphore(3); // 开启10条线程 for ( int i=0; i<10; i++ ) { new Thread( new Runnbale(){ public void run(){ // 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源 semaphore.acquire(); // 任务代码 …… // 释放资源 semaphore.release(); } } ).start(); }

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

用户登录
用户注册