您现在的位置是:首页 > 文章详情

【并发编程】 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的

日期:2019-06-01点击:708

一、了解 AbstractQueuedSynchronizer(AQS)

1、AQS 简介

AbstractQueuedSynchronizer 是大师 Doug Lea 编写的一个并发编程类,位于 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的组成部分,他们中关于 “锁” 的部分与 AQS 息息相关。

借用一下源码中的说法,AbstractQueuedSynchronizer 基于一个 FIFO 队列 提供了一套阻塞锁和同步相关的实现。该类被设计成为很多同步容器 synchronizers 的底层实现,它使用了一个原子int private volatile int state; 来表示当前状态。当在 AQSacquired (获取资源) 或被 release (释放资源)时,需要依据这个 state 来进行判断。所以子类需要定义方法来修改这个状态,该状态的含义由我们自由定制。(翻译的不好...)

2、实现最简单的 AQS

我们来看一个最简单的例子,我们有一个类 Sync 继承了 AbstractQueuedSynchronizer,并重写了其 tryAcquiretryRelease 方法。实现非常简单,我们通过调用父类的 compareAndSetState() 以及 setState() 来完成,简单来说(不是特别准确),就是 tryAcquire 返回 true,代表获取锁成功,否则就会阻塞。而 tryRelease 则负责锁的释放。

在例子中:将 state 设置为 100 代表当前状态为无锁,1 则代表已经有某个线程获取了该锁。当然这个 state 表达的含义是怎么样的,完全是我们定义的,实际上锁定或者无锁是 100 还是 200 还是 -100,都没有什么关系。

/** * Created by Anur IjuoKaruKas on 2019/5/7 */ public class Mutex extends AbstractQueuedSynchronizer { public static class Sync extends AbstractQueuedSynchronizer { public Sync() { setState(100); // set the initial state, being unlocked. } @Override protected boolean tryAcquire(int ignore) { boolean result = compareAndSetState(100, 1); print("尝试获取锁" + (result ? "成功" : "失败")); return result; } @Override protected boolean tryRelease(int ignore) { setState(100); return true; } } private final Sync sync = new Sync(); public void lock() { sync.acquire(0); } public void unLock() { sync.release(0); } public static void main(String[] args) throws InterruptedException { Mutex mutex = new Mutex(); mutex.lock(); Thread thread = new Thread(() -> { print("调用 mutex.lock() 之前"); mutex.lock(); print("调用 mutex.lock() 之后"); }); thread.start(); print("main 线程 Sleep 之前"); Thread.sleep(5000); print("main 线程 Sleep 之后"); mutex.unLock(); } public static void print(String print) { System.out.println(String.format("时间 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print)); } } ========================================= 输出 时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] 尝试获取锁成功 时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] main 线程 Sleep 之前 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 15:44:24 CST 2019 Thread[main,5,main] main 线程 Sleep 之后 时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功 时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后 

我们可以看到,代码符合我们的预期:在 main 函数所在线程调用 mutex.unLock(); 释放锁之前,子线程是一直阻塞的,调用 mutex.lock() 之后 的日志输出发生在 main 线程 Sleep 之后 之后。

通过重写 tryAcquiretryRelease 方法,以及调用 acquirerelease 方法,我们很容易就实现了一个锁,当然这个锁有一堆问题... 我们只是通过这个小例子,来建立对 AQS 一个简单的了解。

看到这里,有些细心的小伙伴可能会想了,既然锁是由 tryAcquire 控制的,那和 state 又有什么关系呢? 我们完全可以定义一个自定义变量,比如 signfalse 代表无锁,true 代表锁定,好像也可以实现这段逻辑啊?这个时候就需要引出我们神奇的 compareAndSetCAS操作了。

3、AQS 绕不过的话题: CAS Compare And Swap

前面说到,我们暂时认为 :tryAcquire 返回 true,代表获取到锁,反之只要 tryAcquire 返回 flase,线程就会被阻塞(不准确,后面会细说)。实际上这里有一个 隐含条件,我们必须做到:


  • ※ 无论何时,都只能有一个线程 tryAcquire 成功,且在某个线程 tryAcquire 成功之后,并在其 release 释放锁之前,任何线程进行 tryAcquire 都将返回 false

是的,就是并发问题!

下面这个例子我们简单使用一个自定义变量 sign 来实现 tryAcquire,看看会发生什么:

 private boolean sign; @Override protected boolean tryAcquire(int ignore) { boolean result = false; if (!sign) { sign = true; result = true; } print("尝试获取锁" + (result ? "成功" : "失败")); return result; } @Override protected boolean tryRelease(int ignore) { sign = false; return true; } ========================================= 输出 时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] 尝试获取锁成功 时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] main 线程 Sleep 之前 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败 时间 - Fri May 24 18:03:17 CST 2019 Thread[main,5,main] main 线程 Sleep 之后 时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功 时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后 

看起来好像没问题,在这个 demo 中也得到了和第一个 DEMO 一样的预期的结果。然而事情并没有那么简单,新写的这个 tryAcquire 实现是一个 "CompareThenSet" 操作,在并发的情况下,会出现不可预期的情况

  • 线程A 进来,发现 signfalse
  • 线程B 同时进来,也发现 signfalse
  • 两者同时将 sign 修改为 true,问题就来了。

到底是 线程A 获取到了锁,还是 线程B 呢?(实际上都获取到了)

我们改一下 Main 方法,我们使用 100 个线程并发执行 mutex.lock(); 获取锁成功则会输出语句 print("获取锁成功");,执行,发现,竟然有两个线程同时获取到了锁。有两个线程同时将 sign 修改为了 true

 public static void main(String[] args) throws InterruptedException { Mutex mutex = new Mutex(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10000; i++) { threads.add(new Thread(() -> { mutex.lock(); print("获取锁成功"); })); } ExecutorService executorService = Executors.newFixedThreadPool(100); threads.forEach(executorService::submit); Thread.sleep(1000); } 

如果我们使用 AQS 帮我们写好的 compareAndSetState 则没有这个问题。

Java9 之前,底层实现是调用 unsafe 包的 compareAndSwapInt 来实现的:

 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } 

而在 Java9 之后,则是使用 VarHandle 来实现 VarHandleunSafe 的一个替代方案,本文不多赘述,后面会有文章讲到这个 ~ 。

 // VarHandle mechanics private static final VarHandle STATE; --------------------------------------------- protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); } 

这里简单的说一下 CAS,即 CompareAndSwapCAS 可原子性地比较并替换一个值,乐观锁中一个典型的实现便是使用 CAS 来完成的。对并发编程有所了解的小伙伴应该都知道 CAS,一般情况下,Compare(比较)Swap(交换) 至少是两个原子操作(实际上是更多个原子操作,主要看编译成多少条机器码)。CAS 则保证了 CompareSwap 为一个原子操作。


二、深入理解 AbstractQueuedSynchronizer(AQS) 资源锁定与解锁正向流程

上文说到,我们暂时认为 :tryAcquire 返回 true,代表获取到锁,反之只要 tryAcquire 返回 flase,线程就会被阻塞。

AQS 当然没有这么简单,但我们可以先看看加锁时调用的 acquire 方法:

 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 

我们发现,tryAcquire 只是第一重判断,如果 tryAcquire 失败,紧接着还有另一个核心逻辑 acquireQueued。在简介里,我们说,AQS 除了使用一个 原子state 来作为状态判断以外,还有一个 FIFO 队列,此队列就和 acquireQueued 方法息息相关。另外,AQS 所控制的资源访问,还可以是共享的,或者独占的(addWaiter 参数 Node.EXCLUSIVE)。

以下的分析我们以一个简单的 独占式非公平 AQS 实现: java.util.concurrent.locks.ReentrantLock.NonfairSync 来深入解析。独占式很好理解,大部分的锁实现都只允许一个线程在同一时间获取到锁定的资源。

1、TryAcquire 与 TryRelease 的标准写法及其优化

先看看 NonfairSynctryAcuire 是怎么实现及优化的。首先,NonfairSync 中将 state == 0 定义为无锁状态。

  1. 竞争优化: 如果当前无锁(state == 0),再调用 CAS。这实际上对性能是一个很好的优化,假设当前取 state 不为 0,实际上 CompareAndSetState 成功的概率也很小,这也可以避免同一时间内,过多的线程去并发修改 state 这个状态。
  2. 重入设计: 试想如果我们不判断当前线程是否持有锁,就去进行 CAS 操作,会发生什么?毫无疑问是 CAS 失败,这会间接导致死锁。这里我们可以看到,重入以后,有一个 int nextc = c + acquires; 操作,这是方便我们记录到底套了几层锁用的,如果没有这个机制,我们将无法精确的控制加锁和解锁的层级,难免会出现一些意料之外的情况。简单来说:lock 几次,就要 unLock 几次。当然我们也可以做到 aquire 多次,一次性 release 掉,或者反过来,取决于怎么我们实现 tryAquiretryRlease 方法。
  3. 偏向优化: 这个优化实际上很简单,如果说要获取锁的线程就是锁的持有线程,我们无需去进行任何 CAS 操作,返回 true 即可。
 @ReservedStackAccess final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 避免过多的线程竞争 CAS 操作 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current);// 如果 CAS 操作成功,则将当前线程保存起来,重入和解锁时用于判断。 return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 重入优化,每次加锁相当于 `state++` if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; // 偏向优化 } return false; } @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 每次解锁相当于 `state--` 直到 state == 0 ,代表可释放锁了 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } 

2、AcquireQueued 解析

①、addWaiter阶段

如果 tryAquire 失败,就会进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))addWaiter 方法创建了一个新的 Node 实例,Node 实例中主要保存了当前线程信息,并将 nextWaiter 赋值为 Node.EXCLUSIVE, 这个 nextWaiter 后面再谈,它主要用于线程调度、以及独占模式、共享模式的区分,我们可以先不管它。

操作比较简单,原理是将 node 塞入双向链表尾端,也就是前面提到的 FIFO队列。就是利用 CAS 操作将新创建的、带有本线程信息的 node 设置为双向链表新的 tail,并且修改两者的 ‘指针’ prevnext

 /** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); } 
 Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); // 初始化双向链表,就是创建一个新的空 node,并且头尾都是此 node。 // 这个 node 除了拿来标记链表从哪里开始,没有什么别的意义。 } } 
②、acquireQueued阶段(自旋)

入队成功后,进入 acquireQueued 方法,抛开线程被 interrupt 的情况acquireQueued 的代码其实也很简单,我们不看 interrupt 相关逻辑,其实逻辑还是很简单的。这是一个无限循环(或者说自旋),只要没有 tryAcquire 成功,就会一直循环下去,逻辑如下:

  1. 如果上一个节点是 FIFO 队列头,则进行一次 tryAquire,如果成功,则跳出循环。
  2. 检测是否需要阻塞,如果需要阻塞,则阻塞等待唤醒,parkAndCheckInterrupt 便是阻塞直到被唤醒(或者被 interrupt ,暂时先不考虑这个情况)。
 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 

tryAcquire()parkAndCheckInterrupt() 都很好理解,前者就是去尝试一下获取锁定资源,看能否成功。后者则是阻塞直到被唤醒。

③、阻塞阶段

我们先说说 shouldParkAfterFailedAcquire,这个判断是一个挺有意思的设计,后续文章会细说,它和线程调度、取消获取锁等相关。因为在获取锁定资源和释放锁定资源的过程中,实际上我们只需要用到两个状态,一个是初始状态 pred.waitStatus == 0,另一个是 pred.waitStatus == SIGNAL == -1

代码中我们可以很容易看出,在 CASprev 节点的 waitStatus 设置为 SIGNAL : -1 之前,都将返回 false,如果设置成功,下一次自旋进入该方法就是 true 了,也就是说,会进入 parkAndCheckInterrupt() 方法,阻塞直到被唤醒。

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 
自旋阶段图解:

阻塞阶段图解:

3、release 解析

①、唤醒 FIFO 的下一个节点

阻塞直到唤醒这个逻辑在锁定资源、释放资源 这两个阶段来看十分简单,最后我们来看看 release 做了什么,release 除了调用了我们自己实现的 tryRelease 之外,其实关键的就是这个 unparkSuccessor

tryRelease 上面也说过了,就是改改原子 state,这里不多赘述。

 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 

代码中可以看出,当 FIFO 队列不为空且头结点的 waitStatu 被修改过,就会进入 unparkSuccessorunparkSuccessor 传入了当前 FIFO 的队列头,逻辑如下:

  1. 如果当前节点 waitStatus 为负(可能为 SIGNALCONDITION 或者 PROPAGATE),我们这里简单先看成只有 SIGNAL 状态,则 CAS 将其设置为 0。其他几个状态我们后面会说到。
  2. 如果 !(s == null || s.waitStatus > 0),也就是说 node.nextwaitStatus <= 0 ,则简单的直接将其唤醒:LockSupport.unpark(s.thread);
 private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } 

②、被唤醒后

被唤醒的线程当然不是直接获得了锁,它还是会继续 acquireQueue 进行自旋,逻辑还是和之前一样,避免小伙伴往上翻代码,这里贴了一份如果 prev 是头结点,如果 tryAcquire 成功,我们看到其实很简单,只是将自己设为头部即可。

 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } 


这篇文章只是简单的说说 AQS 的正向获取资源,释放资源流程,后续会继续解析 conditioncancel 等基于 AQS 的线程调度解析 ~~ 以及各个锁是如何实现 AQS 的 ~~

文章皆是基于源码一步步分析,没有参考过多资料,如有错误,请指出!!


另外欢迎来 Q 群讨论技术相关(目前基本没人)[左二维码]~

如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码]~


参考资料:

JDK12 源码
Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example

另外小伙伴可以思考一下:

  1. 如果说阶段7:ThreadB 被唤醒后,继续自旋时,另一个线程ThreadC tryAcquire成功了会发生什么。
  2. 如果说第一个问题了解了,那应该就很清楚为什么说本文解析的这个锁叫做:非公平锁了
  3. 众所周知,只要是 CAS 操作,都有 ABA 问题,如果说修改 waitStatus 发生了 ABA 问题,会发生什么?
原文链接:https://my.oschina.net/anur/blog/3057120
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章