java源码 - ReentrantReadWriteLock读锁介绍
开篇
这篇文章主要从源码角度讲解ReentrantReadWriteLock的ReadLock的加锁和减锁过程。
ReentrantReadWriteLock的ReadLock加锁解锁过程依赖于AQS类,所以有些相同的逻辑可以看看ReentrantLock的逻辑。
ReentrantReadWriteLock的ReadLock的唤醒过程具备传播性:
假设按照顺序A->B->C->D占用读锁,唤醒会依次进行
A线程占用读锁被唤醒后,A线程的锁释放会唤醒B线程。
B线程占用读锁被唤醒后,B线程的锁释放会唤醒C线程。
C线程占用读锁被唤醒后,C线程的锁释放会唤醒D线程。
- ReentrantReadWriteLock的数据结构介绍
- java源码 - ReentrantReadWriteLock读锁介绍
- java源码 - ReentrantReadWriteLock写锁介绍
加锁过程
- ReadLock的lock()内部通过sync. acquireShared(1)获取锁。
- acquireShared()方法内部先通过tryAcquireShared尝试获取锁。
- 如果获锁失败执行doAcquireShared()方法加入等待队列。
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared过程
- 如果当前锁写状态不为0且占锁线程非当前线程,那么返回占锁失败的值-1。
- 如果公平策略没有要求阻塞且重入数没有到达最大值,则直接尝试cas更新state。
- 如果cas操作成功,有以下操作逻辑:
- 首先,如果当前读锁计数为0那么就设置第一个读线程就是当前线程。
- 其次,当前线程和firstReader同一个线程,记录firstReaderHoldCount也就是第一个读线程读锁定次数。
- 最后,读锁数量不为0并且不为当前线程,获取当前线程ThreadLocal当中的读锁重入计数器。
- 结果返回占锁成功的值1
- 如果cas操作失败,有以下操作逻辑:
- 通过fullTryAcquireShared尝试获取读锁,内部处理和tryAcquireShared过程相同。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果当前锁写状态不为0且占锁线程非当前线程,那么返回占锁失败。 // 也就是当前线程先占写锁后可以再占读锁的,反之不行。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 判断高位的读状态标记 int r = sharedCount(c); //如果公平策略没有要求阻塞且重入数没有到达最大值,则直接尝试CAS更新state //如果读不应该阻塞并且读锁的个数小于最大值65535,并且可以成功更新状态值 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 如果当前线程满足对state的操作条件, // 就利用CAS设置state+SHARED_UNIT,实际上就是读状态+1。 // 但是需要注意,这个state是全局的,即所有线程获取读锁次数的总和, // 而为了方便计算本线程的读锁次数以及释放掉锁, // 需要在ThreadLocal中维护一个变量。这就是HoldCounter。 //如果当前读锁为0 if (r == 0) { // 第一个读线程就是当前线程 firstReader = current; firstReaderHoldCount = 1; } //如果当前线程重入了,记录firstReaderHoldCount else if (firstReader == current) { firstReaderHoldCount++; } //当前读线程和第一个读线程不同,记录每一个线程读的次数 else { // 每个线程自己维护cachedHoldCounter HoldCounter rh = cachedHoldCounter; // 计数器为空或者计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //用来处理CAS没成功的情况,逻辑和上面的逻辑是类似的,就是加了无限循环 return fullTryAcquireShared(current); }
fullTryAcquireShared过程
- fullTryAcquireShared内部通过for()循环进行逻辑操作。
- 内部处理和tryAcquireShared过程相同。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 无限循环 for (;;) { // 获取状态 int c = getState(); // 写线程数量不为0且当前线程不是写线程那么返回获锁失败 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } // 写线程数量为0并且读线程被阻塞 else if (readerShouldBlock()) { if (firstReader == current) { // 当前线程为第一个读线程 // assert firstReaderHoldCount > 0; } else { // 当前线程不为第一个读线程 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 读锁数量为最大值,抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 比较并且设置成功,后续的这部分逻辑跟之前讲的一模一样 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
doAcquireShared过程
- doAcquireShared主要实现获读锁失败后的等待操作。
- doAcquireShared通过addWaiter(Node.SHARED)将当前线程封装成SHARED类型Node并添加到CLH队列。
- 如果当前线程的Node节点是CLH队列的第一个节点则当前线程直接获取锁并开启读锁的扩散唤醒所有阻塞读锁的线程。
- 如果当前线程的Node节点不是CLH队列的第一个节点那么就通过parkAndCheckInterrupt进入休眠
- doAcquireShared的内部的自旋保证了线程被唤醒后再次判断是否是第一个节点并尝试获取锁,失败再次进入休眠。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 开始自旋重试 for (;;) { // 获取当前线程代表节点的前一个节点 final Node p = node.predecessor(); // 如果前一个节点是head节点(head节点不保存任何线程), // 表明当前节点是第一个等待唤醒节点 if (p == head) { // 尝试获取锁 int r = tryAcquireShared(arg); // 如果获锁成功 if (r >= 0) { // 说明当前线程获取读锁成功,那么设置当前线程Node为head // 同时扩散唤醒相关读线程,因为读线程之间相互不阻塞,可以一起唤醒继续工作 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 如果“当前线程”不是CLH队列的表头, // 则通过shouldParkAfterFailedAcquire()判断是否需要等待, // 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。 // 若阻塞等待过程中,线程被中断过,则设置interrupted为true。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate过程
- 设置当前线程的Node为CLH队列的head节点
- 判断当前节点的后置节点为空或者是SHARED状态那么就唤起后置的读锁阻塞线程。
- doReleaseShared在解锁过程也同样提及,放到后面解释。
private void setHeadAndPropagate(Node node, int propagate) { // 设置当前节点为新的head节点 Node h = head; setHead(node); // 如果还有剩余量,继续唤醒下一个邻居线程 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 当前节点的后置节点 Node s = node.next; if (s == null || s.isShared()) // 唤醒后起等待线程 doReleaseShared(); } }
解锁过程
- ReadLock的unlock()方法调用sync.releaseShared(1)方法进行释放。
- 调用tryReleaseShared()方法尝试释放锁,如果释放成功,调用doReleaseShared尝试唤醒下一个节点
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared过程
- 如果是第一个读线程则修改firstReaderHoldCount,如果不是则修改局部HoldCounter。
- for循环的死循环当中释放一把锁并设置全局锁状态state。
protected final boolean tryReleaseShared(int unused) { // 得到调用unlock的线程 Thread current = Thread.currentThread(); //如果是第一个获得读锁的线程,进行解锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } // 否则,线程ThreadLocal的HoldCounter中计数-1 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 死循环 for (;;) { int c = getState(); // 释放一把读锁 int nextc = c - SHARED_UNIT; // 如果CAS更新状态成功,返回读锁是否等于0;失败的话,则重试 if (compareAndSetState(c, nextc)) return nextc == 0; } }
doReleaseShared过程
- 如果当前节点状态为SIGNAL,那么就通过unparkSuccessor()方法唤醒后置等待线程
- 成功后则跳出for循环
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { // 当前节点为SIGNAL那么就需要通过unparkSuccessor(p)唤醒后置等待线程 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 跳出循环 if (h == head) // loop if head changed break; } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
参考文章

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java BufferedInputStream BufferedOutputStream类源码解析
BufferedInputStream BufferedInputStream是一个缓冲输入流,继承的是FilterInputStream。FilterInputStream包含了另一个InputStream作为它的基础数据源,并且FilterInputStream重写了InputStream的所有方法。作为FilterInputStream需要重写其中的部分方法,如果没有重写的话默认调用InputStream的同名方法。 对于BufferedInputStream来说,常规的用法是以FileInputStream作为它的下层输入流,原因是FileInputStream是从文件中读取字节,而文件处于硬盘之中,每次从硬盘读取数据的速度相对于内存很慢。如果多次通过FileInputStream读取或跳过一段字符,就需要多次访问硬盘,如果能够通过一次访问将整块内容读取到内存中,再从内存中多次读取,效率就可以大幅度提高,这就是内存缓冲区。举个极端例子来说,连续调用FileInputStream.read()一千次,得到文件中一千个连续字节,访问硬盘1000次。如果使用FileInput...
- 下一篇
日志实时分析:从入门到精通
日志实时分析:从入门到精通 为什么要实时分析日志 做业务的人,常常需要面临以下问题: 产品的受欢迎程度怎么样?该重点投入到哪个功能中? 各个营销渠道的流量怎么样? 该向哪个渠道投入更多资源? 各个阶段的转化率、留存率有多少?有哪些瓶颈点需要优化? 如果没有数据,就只能够靠瞎猜,靠经验来判断。一旦业务方向走偏,一方面自己的业务量受影响,另一方面,耽误时间,给竞争对手以可乘之机。 日志是程序记录事件和状态的数据。通过日志数据的采集、存储和分析,我们可以很好地掌控产品的质量,辅助我们对开发、运营、运维等活动作出更加及时、更加精确的决策。在业务上,时间就是金钱,能够快速的实时分析出结果,能够及时的帮我们发现业务上的问题,节省我们的时间,帮助我们把业务能力提升不止一个量级。 那么,我们如何选择日志分析的架构呢? 日志分析:架构选型 在日志分析领域,用户可以选择
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8安装Docker,最新的服务器搭配容器使用
- 设置Eclipse缩进为4个空格,增强代码规范
- Docker快速安装Oracle11G,搭建oracle11g学习环境