Java并发之CountDownLatch、Semaphore和CyclicBarrier
JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch、Semaphore和CyclicBarrier。
CountDownLatch
CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。调用该类await方法的线程会一直处于阻塞状态,直到其他线程调用countDown方法使当前计数器的值变为零,每次调用countDown计数器的值减1。当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier。
在某些业务场景中,程序执行需要等待某个条件完成后才能继续执行后续的操作;典型的应用如并行计算,当某个处理的运算量很大时,可以将该运算任务拆分成多个子任务,等待所有的子任务都完成之后,父任务再拿到所有子任务的运算结果进行汇总。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
结果:
20:18:32.917 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 6
20:18:32.917 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 5
20:18:32.919 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 4
20:18:32.918 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 0
20:18:32.918 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 2
20:18:32.916 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 8
20:18:32.918 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 3
20:18:32.916 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 9
20:18:32.916 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 7
20:18:32.917 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 1
20:18:33.032 [main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - finish
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
结果: 超过指定时间跳过等待
20:19:34.878 [main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - finish
20:19:34.964 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 2
20:19:34.965 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 9
20:19:34.964 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 0
20:19:34.965 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 7
20:19:34.964 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 1
20:19:34.965 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 4
20:19:34.965 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 6
20:19:34.964 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 3
20:19:34.965 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 8
20:19:34.965 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 5
Semaphore
Semaphore与CountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量,类似阀门的 功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。有点类似于锁的lock与 unlock过程。相对来说他也有两个主要的方法:
用于获取权限的acquire(),其底层实现与CountDownLatch.countdown()类似;
用于释放权限的release(),其底层实现与acquire()是一个互逆的过程。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// 每次最多三个线程获取许可
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample2 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Slf4j
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Slf4j
public class SemaphoreExample4 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
CyclicBarrier
CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行后面的操作。类似于CountDownLatch,它也是通过计数器来实现的。当某个线程调用await方法时,该线程进入等待状态,且计数器加1,当计数器的值达到设置的初始值时,所有因调用await进入等待状态的线程被唤醒,继续执行后续操作。因为CycliBarrier在释放等待线程后可以重用,所以称为循环barrier。CycliBarrier支持一个可选的Runnable,在计数器的值到达设定值后(但在释放所有线程之前),该Runnable运行一次,注,Runnable在每个屏障点只运行一个。
使用场景类似于CountDownLatch与CountDownLatch的区别
- CountDownLatch主要是实现了1个或N个线程需要等待其他线程完成某项操作之后才能继续往下执行操作,描述的是1个线程或N个线程等待其他线程的关系。CyclicBarrier主要是实现了多个线程之间相互等待,直到所有的线程都满足了条件之后各自才能继续执行后续的操作,描述的多个线程内部相互等待的关系。
- CountDownLatch是一次性的,而CyclicBarrier则可以被重置而重复使用。
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
结果: ready ready .. go
20:24:34.616 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 0 is ready
20:24:35.610 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 1 is ready
20:24:36.610 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 2 is ready
20:24:37.611 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 3 is ready
20:24:38.612 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 4 is ready
20:24:38.612 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 0 continue
20:24:38.612 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 1 continue
20:24:38.612 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 4 continue
20:24:38.612 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 3 continue
20:24:38.612 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 2 continue
20:24:39.614 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 5 is ready
20:24:40.613 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 6 is ready
20:24:41.614 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 7 is ready
20:24:42.615 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 8 is ready
20:24:43.615 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 9 is ready
20:24:43.615 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 9 continue
20:24:43.615 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 5 continue
20:24:43.615 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 6 continue
20:24:43.615 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 7 continue
20:24:43.615 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 8 continue
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
结果:
20:28:32.790 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 0 is ready
20:28:33.785 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 1 is ready
20:28:34.786 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 2 is ready
20:28:35.787 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 3 is ready
20:28:36.787 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 4 is ready
20:28:36.787 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - callback is running
20:28:36.787 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 4 continue
20:28:36.788 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 0 continue
20:28:36.788 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 1 continue
20:28:36.788 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 2 continue
20:28:36.788 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 3 continue
20:28:37.788 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 5 is ready
20:28:38.789 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 6 is ready
20:28:39.789 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 7 is ready
20:28:40.790 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 8 is ready
20:28:41.791 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 9 is ready
20:28:41.791 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - callback is running
20:28:41.791 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 9 continue
20:28:41.791 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 5 continue
20:28:41.791 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 6 continue
20:28:41.818 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 8 continue
20:28:41.818 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 7 continue

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
手写SpringMVC实战,一切从Spring底层源码分析开始
Spring框架对于Java后端程序员来说再熟悉不过了,以前只知道它用的反射实现的,但了解之后才知道有很多巧妙的设计在里面。如果不看Spring的源码,你将会失去一次和大师学习的机会:它的代码规范,设计思想很值得学习。 我们程序员大部分人都是野路子,不懂什么叫代码规范。写了一个月的代码,最后还得其他老司机花3天时间重构,相信大部分老司机都很头疼看新手的代码。 废话不多说,我们进入今天的正题,在Web应用程序设计中,MVC模式已经被广泛使用。SpringMVC以DispatcherServlet为核心,负责协调和组织不同组件以完成请求处理并返回响应的工作,实现了MVC模式。点击这里学习 Spring MVC 常用注解。 想要实现自己的SpringMVC框架,需要从以下几点入手: 了解SpringMVC运行流程及九大组件 梳理自己的SpringMVC的设计思路 实现自己的SpringMVC框架 一、了解SpringMVC运行流程及九大组件 1、SpringMVC的运行流程 ⑴ 用户发送请求至前端控制器DispatcherServlet ⑵ DispatcherServlet收到请求调用H...
-
下一篇
(十)J.U.C之AQS--ReentrantLock与锁
首先要知道 Java 中的锁主要分两类锁 , 一种是 synchronize锁 , 另外一种就是 J.U.C中 提供的锁 , J.U.C里核心的锁是 ReentrantLock ReentrantLock (可重入锁)与 synchronize 的区别 可重入性 ReentrantLock 字面意思就是 再进入 锁 , 所以称之为可重入锁 , synchronize 使用的锁也是可重入的. 它俩都是同一个线程进入一次锁的计数器就自增 1,所以要等到锁的计数器下降为 0 时才释放锁 . 锁的实现 synchronize 的锁是基于 JVM 来实现的 , ReentrantLock 是jdk 实现的. 通俗的来讲就是 操作系统来控制实现和用户编码实现的区别 . 性能区别 在 synchronize 关键字优化之前, 其性能比 ReentrantLock 差 , 但是优化过后 , 在两者都可以使用的情况下, 建议使用 synchronize, 主要是其写法比较容易 功能 synchronize 写起来更简洁 , 它是由编译器来实现锁的加锁和释放 , 而ReentrantLock 需要我们手工...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- MySQL8.0.19开启GTID主从同步CentOS8
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G