【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的
一、了解 AbstractQueuedSynchronizer(AQS)
1、AQS 简介
AbstractQueuedSynchronizer
是大师 Doug Lea 编写的一个并发编程类,位于 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的组成部分,他们中关于 “锁” 的部分与 AQS
息息相关。
借用一下源码中的说法,AbstractQueuedSynchronizer
基于一个 FIFO 队列
提供了一套阻塞锁和同步相关的实现。该类被设计成为很多同步容器 synchronizers
的底层实现,它使用了一个原子int private volatile int state;
来表示当前状态。当在 AQS
被 acquired
(获取资源) 或被 release
(释放资源)时,需要依据这个 state
来进行判断。所以子类需要定义方法来修改这个状态,该状态的含义由我们自由定制。(翻译的不好...)
2、实现最简单的 AQS
我们来看一个最简单的例子,我们有一个类 Sync
继承了 AbstractQueuedSynchronizer
,并重写了其 tryAcquire
和 tryRelease
方法。实现非常简单,我们通过调用父类的 compareAndSetState()
以及 setState()
来完成,简单来说(不是特别准确),就是 tryAcquire
返回 true
,代表获取锁成功,否则就会阻塞。而 tryRelease
则负责锁的释放。
在例子中:将 state
设置为 100
代表当前状态为无锁,1
则代表已经有某个线程获取了该锁。当然这个 state
表达的含义是怎么样的,完全是我们定义的,实际上锁定或者无锁是 100
还是 200
还是 -100
,都没有什么关系。
/** * Created by Anur IjuoKaruKas on 2019/5/7 */ public class Mutex extends AbstractQueuedSynchronizer { public static class Sync extends AbstractQueuedSynchronizer { public Sync() { setState(100); // set the initial state, being unlocked. } @Override protected boolean tryAcquire(int ignore) { boolean result = compareAndSetState(100, 1); print("尝试获取锁" + (result ? "成功" : "失败")); return result; } @Override protected boolean tryRelease(int ignore) { setState(100); return true; } } private final Sync sync = new Sync(); public void lock() { sync.acquire(0); } public void unLock() { sync.release(0); } public static void main(String[] args) throws InterruptedException { Mutex mutex = new Mutex(); mutex.lock(); Thread thread = new Thread(() -> { print("调用 mutex.lock() 之前"); mutex.lock(); print("调用 mutex.lock() 之后"); }); thread.start(); print("main 线程 Sleep 之前"); Thread.sleep(5000); print("main 线程 Sleep 之后"); mutex.unLock(); } public static void print(String print) { System.out.println(String.format("时间 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print)); } } ========================================= 输出 时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] 尝试获取锁成功 时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] main 线程 Sleep 之前 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:24 CST 2019 Thread[main,5,main] main 线程 Sleep 之后 时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功 时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
我们可以看到,代码符合我们的预期:在 main 函数所在线程调用 mutex.unLock();
释放锁之前,子线程是一直阻塞的,调用 mutex.lock() 之后
的日志输出发生在 main 线程 Sleep 之后
之后。
通过重写 tryAcquire
、tryRelease
方法,以及调用 acquire
和 release
方法,我们很容易就实现了一个锁,当然这个锁有一堆问题... 我们只是通过这个小例子,来建立对 AQS
一个简单的了解。
看到这里,有些细心的小伙伴可能会想了,既然锁是由 tryAcquire
控制的,那和 state
又有什么关系呢? 我们完全可以定义一个自定义变量,比如 sign
,false
代表无锁,true
代表锁定,好像也可以实现这段逻辑啊?这个时候就需要引出我们神奇的 compareAndSet
,CAS
操作了。
3、AQS 绕不过的话题: CAS Compare And Swap
前面说到,我们暂时认为 :tryAcquire
返回 true
,代表获取到锁,反之只要 tryAcquire
返回 flase
,线程就会被阻塞(不准确,后面会细说)。实际上这里有一个 隐含条件,我们必须做到:
- ※ 无论何时,都只能有一个线程
tryAcquire
成功,且在某个线程tryAcquire
成功之后,并在其release
释放锁之前,任何线程进行tryAcquire
都将返回false
。
是的,就是并发问题!
下面这个例子我们简单使用一个自定义变量 sign
来实现 tryAcquire
,看看会发生什么:
private boolean sign; @Override protected boolean tryAcquire(int ignore) { boolean result = false; if (!sign) { sign = true; result = true; } print("尝试获取锁" + (result ? "成功" : "失败")); return result; } @Override protected boolean tryRelease(int ignore) { sign = false; return true; } ========================================= 输出 时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] 尝试获取锁成功 时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] main 线程 Sleep 之前 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:17 CST 2019 Thread[main,5,main] main 线程 Sleep 之后 时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功 时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
看起来好像没问题,在这个 demo 中也得到了和第一个 DEMO 一样的预期的结果。然而事情并没有那么简单,新写的这个 tryAcquire
实现是一个 "CompareThenSet"
操作,在并发的情况下,会出现不可预期的情况
- 线程A 进来,发现
sign
为false
- 线程B 同时进来,也发现
sign
为false
- 两者同时将
sign
修改为true
,问题就来了。
到底是 线程A 获取到了锁,还是 线程B 呢?(实际上都获取到了)
我们改一下 Main 方法,我们使用 100 个线程并发执行 mutex.lock();
获取锁成功则会输出语句 print("获取锁成功");
,执行,发现,竟然有两个线程同时获取到了锁。有两个线程同时将 sign
修改为了 true
。
public static void main(String[] args) throws InterruptedException { Mutex mutex = new Mutex(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10000; i++) { threads.add(new Thread(() -> { mutex.lock(); print("获取锁成功"); })); } ExecutorService executorService = Executors.newFixedThreadPool(100); threads.forEach(executorService::submit); Thread.sleep(1000); }
如果我们使用 AQS
帮我们写好的 compareAndSetState
则没有这个问题。
在 Java9
之前,底层实现是调用 unsafe
包的 compareAndSwapInt
来实现的:
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
而在 Java9
之后,则是使用 VarHandle
来实现 VarHandle
是 unSafe
的一个替代方案,本文不多赘述,后面会有文章讲到这个 ~ 。
// VarHandle mechanics private static final VarHandle STATE; --------------------------------------------- protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); }
这里简单的说一下 CAS
,即 CompareAndSwap
。CAS
可原子性地比较并替换一个值,乐观锁中一个典型的实现便是使用 CAS
来完成的。对并发编程有所了解的小伙伴应该都知道 CAS
,一般情况下,Compare(比较)
和 Swap(交换)
至少是两个原子操作(实际上是更多个原子操作,主要看编译成多少条机器码)。而 CAS
则保证了 Compare
和 Swap
为一个原子操作。
二、深入理解 AbstractQueuedSynchronizer(AQS) 资源锁定与解锁正向流程
上文说到,我们暂时认为 :tryAcquire
返回 true
,代表获取到锁,反之只要 tryAcquire
返回 flase
,线程就会被阻塞。
AQS
当然没有这么简单,但我们可以先看看加锁时调用的 acquire
方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们发现,tryAcquire
只是第一重判断,如果 tryAcquire
失败,紧接着还有另一个核心逻辑 acquireQueued
。在简介里,我们说,AQS
除了使用一个 原子state
来作为状态判断以外,还有一个 FIFO 队列
,此队列就和 acquireQueued
方法息息相关。另外,AQS
所控制的资源访问,还可以是共享的,或者独占的(addWaiter
参数 Node.EXCLUSIVE
)。
以下的分析我们以一个简单的 独占式非公平 AQS
实现: java.util.concurrent.locks.ReentrantLock.NonfairSync
来深入解析。独占式很好理解,大部分的锁实现都只允许一个线程在同一时间获取到锁定的资源。
1、TryAcquire 与 TryRelease 的标准写法及其优化
先看看 NonfairSync
的 tryAcuire
是怎么实现及优化的。首先,NonfairSync
中将 state == 0
定义为无锁状态。
- 竞争优化: 如果当前无锁(
state == 0
),再调用CAS
。这实际上对性能是一个很好的优化,假设当前取state
不为0
,实际上CompareAndSetState
成功的概率也很小,这也可以避免同一时间内,过多的线程去并发修改state
这个状态。 - 重入设计: 试想如果我们不判断当前线程是否持有锁,就去进行
CAS
操作,会发生什么?毫无疑问是CAS
失败,这会间接导致死锁。这里我们可以看到,重入以后,有一个int nextc = c + acquires;
操作,这是方便我们记录到底套了几层锁用的,如果没有这个机制,我们将无法精确的控制加锁和解锁的层级,难免会出现一些意料之外的情况。简单来说:lock
几次,就要unLock
几次。当然我们也可以做到aquire
多次,一次性release
掉,或者反过来,取决于怎么我们实现tryAquire
和tryRlease
方法。 - 偏向优化: 这个优化实际上很简单,如果说要获取锁的线程就是锁的持有线程,我们无需去进行任何
CAS
操作,返回true
即可。
@ReservedStackAccess final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 避免过多的线程竞争 CAS 操作 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current);// 如果 CAS 操作成功,则将当前线程保存起来,重入和解锁时用于判断。 return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 重入优化,每次加锁相当于 `state++` if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; // 偏向优化 } return false; } @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 每次解锁相当于 `state--` 直到 state == 0 ,代表可释放锁了 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
2、AcquireQueued 解析
①、addWaiter阶段
如果 tryAquire
失败,就会进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
,addWaiter
方法创建了一个新的 Node
实例,Node
实例中主要保存了当前线程信息,并将 nextWaiter
赋值为 Node.EXCLUSIVE
, 这个 nextWaiter
后面再谈,它主要用于线程调度、以及独占模式、共享模式的区分,我们可以先不管它。
操作比较简单,原理是将 node
塞入双向链表尾端,也就是前面提到的 FIFO队列
。就是利用 CAS
操作将新创建的、带有本线程信息的 node
设置为双向链表新的 tail
,并且修改两者的 ‘指针’ prev
和 next
。
/** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); }
Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); // 初始化双向链表,就是创建一个新的空 node,并且头尾都是此 node。 // 这个 node 除了拿来标记链表从哪里开始,没有什么别的意义。 } }
②、acquireQueued阶段(自旋)
入队成功后,进入 acquireQueued
方法,抛开线程被 interrupt
的情况,acquireQueued
的代码其实也很简单,我们不看 interrupt
相关逻辑,其实逻辑还是很简单的。这是一个无限循环(或者说自旋),只要没有 tryAcquire
成功,就会一直循环下去,逻辑如下:
- 如果上一个节点是
FIFO
队列头,则进行一次tryAquire
,如果成功,则跳出循环。 - 检测是否需要阻塞,如果需要阻塞,则阻塞等待唤醒,
parkAndCheckInterrupt
便是阻塞直到被唤醒(或者被interrupt
,暂时先不考虑这个情况)。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
tryAcquire()
和 parkAndCheckInterrupt()
都很好理解,前者就是去尝试一下获取锁定资源,看能否成功。后者则是阻塞直到被唤醒。
③、阻塞阶段
我们先说说 shouldParkAfterFailedAcquire
,这个判断是一个挺有意思的设计,后续文章会细说,它和线程调度、取消获取锁等相关。因为在获取锁定资源和释放锁定资源的过程中,实际上我们只需要用到两个状态,一个是初始状态 pred.waitStatus == 0
,另一个是 pred.waitStatus == SIGNAL == -1
。
代码中我们可以很容易看出,在 CAS
将 prev
节点的 waitStatus
设置为 SIGNAL : -1
之前,都将返回 false
,如果设置成功,下一次自旋进入该方法就是 true
了,也就是说,会进入 parkAndCheckInterrupt()
方法,阻塞直到被唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
自旋阶段图解:
阻塞阶段图解:
3、release 解析
①、唤醒 FIFO 的下一个节点
阻塞直到唤醒这个逻辑在锁定资源、释放资源 这两个阶段来看十分简单,最后我们来看看 release
做了什么,release
除了调用了我们自己实现的 tryRelease
之外,其实关键的就是这个 unparkSuccessor
。
tryRelease
上面也说过了,就是改改原子 state
,这里不多赘述。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
代码中可以看出,当 FIFO
队列不为空且头结点的 waitStatu
被修改过,就会进入 unparkSuccessor
,unparkSuccessor
传入了当前 FIFO
的队列头,逻辑如下:
- 如果当前节点
waitStatus
为负(可能为SIGNAL
、CONDITION
或者PROPAGATE
),我们这里简单先看成只有SIGNAL
状态,则CAS
将其设置为0
。其他几个状态我们后面会说到。 - 如果
!(s == null || s.waitStatus > 0)
,也就是说node.next
的waitStatus <= 0
,则简单的直接将其唤醒:LockSupport.unpark(s.thread);
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
②、被唤醒后
被唤醒的线程当然不是直接获得了锁,它还是会继续 acquireQueue
进行自旋,逻辑还是和之前一样,避免小伙伴往上翻代码,这里贴了一份如果 prev
是头结点,如果 tryAcquire
成功,我们看到其实很简单,只是将自己设为头部即可。
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
这篇文章只是简单的说说 AQS
的正向获取资源,释放资源流程,后续会继续解析 condition
、cancel
等基于 AQS
的线程调度解析 ~~ 以及各个锁是如何实现 AQS
的 ~~
文章皆是基于源码一步步分析,没有参考过多资料,如有错误,请指出!!
另外欢迎来 Q 群讨论技术相关(目前基本没人)[左二维码]~
如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码]~
参考资料:
JDK12 源码
Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example
另外小伙伴可以思考一下:
- 如果说阶段7:
ThreadB
被唤醒后,继续自旋时,另一个线程ThreadC
tryAcquire
成功了会发生什么。 - 如果说第一个问题了解了,那应该就很清楚为什么说本文解析的这个锁叫做:非公平锁了
- 众所周知,只要是
CAS
操作,都有ABA
问题,如果说修改waitStatus
发生了ABA
问题,会发生什么?

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
spring boot集成kafka之spring-kafka深入探秘
前言 kafka是一个消息队列产品,基于Topicpartitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。 项目地址:https://github.com/spring-projects/spring-kafka 简单集成 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.6.RELEASE</version> </dependency> 添加配置 spring.kafka.producer.bootstrap-servers=127.0.0.1:9092 测试发送和...
- 下一篇
Android APP native 崩溃分析之令人困惑的 backtrace
原文地址:https://caikelun.io/post/2019-06-01-android-app-native-crash-confusing-backtrace/ 完美无缺的代码逻辑,一定能产生完美无缺的程序吗?答案是否定的。从软件的层面来看,也许只有二进制才永远不会欺骗你。 现象 近期,业务方反馈了一个奇怪的崩溃问题,认为信息不足,无法解决。 Signal: 11 (SIGSEGV), Code: 1 (SEGV_MAPERR), fault addr 0x1 r0 993ff520 r1 dc3170c4 r2 00000000 r3 dabe3e08 r4 993ff520 r5 00000005 r6 00000290 r7 000007ac r8 e83253a0 r9 00006aba r10 bf921e39 r11 e83253a0 ip bfa3a9e0 sp 993ff494 lr bf88a71d pc bf96c31c #00 pc 001a731c /data/data/com.package.name/files/download/libmcto_...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题