AQS 原理剖析
AQS
即AbstractQueuedSynchronizer
类称作队列同步器,是构建其他同步器的一个重要的基础框架,同步器自身是没有实现任何同步接口。它是通过控制一个int
类型的state
变量来表示同步状态,使用一个内置的FIFO
(先进先出)队列来构建工作队列操作。
同步器定义有两种资源共享方式:Exclusive
(独占式)和Share
(共享式)的获取同步状态。
独占式:一个时间点只能执行一个线程。 共享式:一个时间点可多个线程同时执行。
使用方式
同步器的设计采用模板模式,要实现一个同步组件得先继承AbstractQueuedSynchronizer
类,通过调用同步器提供的方法和重写同步器的方法来实现。
调用同步器中的方法就是调用前面提到的通过state
变量值的操作来表示同步操作,state
是被volatile
修饰来保证线程可见性。
方法名 | 描述 |
---|---|
getState() | 获取当前线程同步状态值。 |
setState(int newState) | 设置当前同步状态值。 |
compareAndSetState(int expect, int update) | 通过CAS 设置state 的值。 |
为了避免被重写,以上方法都被final
修饰了。
实现同步组件,需要自己根据自己定制化的需求进行处理,所以需要自己重写同步器提供的方法,要重写的方法主要是独占式获取与释放同步状态、共享式获取与释放同步状态。
tryAcquire(int arg) 独占式获取同步状态,返回值为boolean
类型,获取成返回true
,获取失败返回false
。
tryRelease(int arg) 独占式释放同步状态,返回值为boolean
类型,释放成返回true
,释放失败返回false
。
tryAcquireShared(int arg) 共享式获取同步状态,返回值为int
类型,获取成功返回大于 0 的值。
tryReleaseShared(int arg) 共享式释放同步状态,返回值为boolean
类型,释放成返回true
,释放失败返回false
。
isHeldExclusively() 独占模式下是否被当前前程独占,返回值为boolean
类型,已被当前线程所独占返回true
,反之为false
。
同步器队列
一个同步器里面拥有一个同步队列和多个等待队列。
同步队列
在AbstractQueuedSynchronizer
类中,有一个内部类Node
,通过该类构造一个内部的同步队列,这是一个FIFO 双向队列。 当前运行线程回去同步状态时,如果获取失败,则将当前线程信息创建一个Node
追加到同步队列尾部,然后阻塞当前线程,直到队列的上一个节点的同步状态释放,再唤醒当前线程尝试重新获取同步状态。这个重新获取同步状态操作的节点,一定要是同步队列中第一节点。
Node 源码如下:
static final class Node { // 共享模式下等待的标记 static final Node SHARED = new Node(); // 独占模式下等待的标记 static final Node EXCLUSIVE = null; /* * 等待状态常量值,以下四个常量都是 */ static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 等待状态 volatile int waitStatus; // 当前节点的前驱节点 volatile Node prev; // 当前节点的后继节点 volatile Node next; // 获取同步状态的线程(引用) volatile Thread thread; // 等待队列中的后继节点 Node nextWaiter; // 是否共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取前驱节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
通过以上代码,可以看到节点中保存了节点模式、等待状态、线程引用、前驱和后继节点,构造节点。
同步队列中被阻塞的线程的等待状态包含有四个常量值:CANCELLED、SIGNAL、CONDITION、PROPAGATE ,它们对应的被阻塞原因如下:
CANCELLED
同步队列中当前节点的线程等待超时或被中断,需要从同步队列中取消等待。SIGNAL
当前节点释放同步状态或被取消后,通知后继节点的线程运行。CONDITION
当前节点在 Condition 上等待,当其他线程对 Condition 调用了 signal() 方法后,该节点将添加到同步队列中。PROPAGATE
该状态存在共享模式的首节点中,当前节点唤醒后将传播唤醒其他节点。
同步器中持有同步队列的首节点和尾节点的引用,在AbstractQueuedSynchronizer
中分别对应head
和tail
字段。
所以同步队列的基本结构如图:
等待队列
AbstractQueuedSynchronizer
类中包含一个内部类ConditionObject
,该类实现了Condition
的接口。一个Condition
对象包含一个等待队列,同时Condition
对象可以实现等待/通知功能。
Condition
持有等待队列中的首节点(firstWaiter)和尾节点(lastWaiter),如下图代码所示:
如果当前线程调用Condition.await()
时,会将当前线程信息构建一个 Node 节点,因为Condition
持有等待队列中的首尾节点,所以将当前等待队列中的尾节点的nextWaiter
指向当前线程构建的节点,同时更新lastWaiter
的引用节点。
上述过程中的节点、队列的操作,是获取到锁的线程来调用Condition.await()
的,所以整个执行过程在没有基于 CAS 的情况下,也是线程安全的。
通过以上的描述,可以知道一个同步器中同步队列、等待队列构成的示意图:
当调用Condition.await()
时,同步队列中的首节点,也就是当前线程所创建的节点,会加入到等待队列中的尾部,释放当前线程的同步状态并且唤醒同步队列的后继节点,当前线程也就进入等待状态,这个先后顺序不能颠倒。这个过程相当于同步队列的首节点的线程构造新的节点加入到等待队列的尾部。
当调用Condition.signal()
方法时,会先将等待队列中首节点转移到同步队列尾部,然后唤醒该同步队列中的线程,该线程从Condition.await()
中自旋退出,接着在在同步器的acquireQueued()
中自旋获取同步状态。
当调用Condition.wait()
方法,同步队列首节点转移到等待队列方法:
public final void await() throws InterruptedException { // 如果线程已中断,则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 添加节点到等待队列中 Node node = addConditionWaiter(); // 修改 state 来达到释放同步状态,避免死锁 int savedState = fullyRelease(node); int interruptMode = 0; // 判断当前节点是否在同步队列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 继续获取同步状态竞争 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // 清除已取消的节点 unlinkCancelledWaiters(); if (interruptMode != 0) // 被中断时的处理 reportInterruptAfterWait(interruptMode); }
上面addc
方法是向等待队列中添加一个新的节点。
private Node addConditionWaiter() { // 获取等待队列中尾节点 Node t = lastWaiter; // 如果最后一个节点已取消,则清除取消节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 利用当前线程信息创建等待队列的节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) // 如果最后尾节点为空,当前节点则为等待队列的首节点 firstWaiter = node; else // 否则将当前尾节点的下一个节点指向当前线程信息所构造的节点 t.nextWaiter = node; lastWaiter = node; // 更新 Condition 的尾节点引用 return node; }
当调用Condition.signal()
方法,等待队列首节点转移到同步队列方法:
public final void signal() { // 是否被当前线程所独占 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取等待队列中首节点 Node first = firstWaiter; if (first != null) // 转移到同步队列,然后唤醒该节点 doSignal(first); }
转移同步队列首节点到同步队列,并唤醒该节点方法doSignal()
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 去除首节点 first.nextWaiter = null; } while (!transferForSignal(first) && // 从等待队列中转移到同步队列 (first = firstWaiter) != null); }
转移等待队列到同步队列方法transferForSignal(Node node)
final boolean transferForSignal(Node node) { // 验证节点是否被取消 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 转移节点至同步队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
等待队列中的头结点线程安全移动到同步队列方法enq(final Node node)
private Node enq(final Node node) { for (;;) { Node t = tail; // 同步队列中如果为空,则初始化同步器 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { // 否则新节点的前驱节点为当前同步队列的尾节点 node.prev = t; // 设置当前新节点为同步队列的尾节点,并更新先前同步队列的尾节点的后继节点指向当前新节点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
独占式同步状态
独占式同步状态获取和释放是线程安全的操作,一个时间点确保只有一个线程获取到同步状态。
独占式同步状态获取
acquire(int arg)
方法是获取独占式同步状态的方法,当线程获取同步失败时,会加入到同步队列中。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述代码中,当执行tryAcquire(int arg)
方法获取同步状态失败时,接着通过addWaiter(Node.EXCLUSIVE)
构造当前线程信息的节点,随后将新构造的节点通过acquireQueued(final Node node, int arg)
方法加入到同步队列中,节点在同步队列中自旋等待获取同步状态。
tryAcquire(int arg)
是自定义同步器实现的,实现该方法需要保证线程安全获取同步状态,前面讲到AQS
提供的compareAndSetState(int expect, int update)
方法通过CAS
设置state
值来保证线程安全。
上面获取独占式同步状态时,主要分析acquireQueued(final Node node, int arg)
方法,节点加入队列并自旋等待。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 是否中断标识 boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 如果前驱节点是首节点,并且当前节点获取到同步状态 if (p == head && tryAcquire(arg)) { // 将当前节点设置为首节点 setHead(node); // 将原首节点(即当前节点的前驱节点)引用清空,利于 GC 回收 p.next = null; // 成功获取到同步状态标志 failed = false; return interrupted; } // 判断前驱节点是否超时或取消,以及当前线程是否被中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 如果被中断,则节点出队 if (failed) cancelAcquire(node); } }
在首节点释放同步状态后,同时唤醒后继节点。后继节点通过自旋的方式(这里利用死循环方式)也会检查自己的前驱节点是否为首节点,如果是前驱节点则会尝试获取同步状态。获取成功则返回,否则判断是否被中断或者继续自旋上述获取同步状态操作。
独占式同步状态释放
release(int arg)
方法是释放同步状态,当释放同步状态后会唤醒后继节点。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease(int arg)
方法同样也是自定义同步器实现。当首节点不为空且处于等待状态时,那么调用unparkSuccessor(Node node)
方法唤醒后继节点。
private void unparkSuccessor(Node node) { // CAS 设置等待状态为初始状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 如果当前释放同步状态的节点不存在后继节点或后继节点超时/被中断 if (s == null || s.waitStatus > 0) { s = null; // 从尾节点中开始寻找等待状态的节点作为新首节点,这里已排除当前节点(t != node) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
释放同步状态的整个过程就是:释放同步状态,唤醒后继节点。这个后继节点必须满足,非空、非当前节点、等待状态小于或等于 0 ,即SIGNAL
、CONDITION
、PROPAGATE
和初始化状态。
独占式资源共享方式除了上面的同步状态获取,还有独占式超时获取使用的方法是doAcquireNanos(int arg, long nanosTimeout)
、独占式可中断获取使用的方法是acquireInterruptibly(int arg)
。
共享式同步状态
共享式同步状态同一时间点可以有多个线程获取到同步状态。
共享式同步状态获取
acquireShared(int arg)
方法是共享式同步状态获取的方法。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 获取同步状态失败后调用的方法 doAcquireShared(arg); }
tryAcquireShared(int arg)
方法是自定义同步器实现的,返回大于或等于 0 时,表示获取成功。如果小于 0 时,获取同步状态失败后会调用doAcquireShared(int arg)
方法进行再次尝试获取。
private void doAcquireShared(int arg) {、 // 构造一个当前线程信息的新节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 自旋式获取同步状态 for (;;) { final Node p = node.predecessor(); // 判断新节点的前驱节点是否为首节点 if (p == head) { // 再次尝试获取同步状态 int r = tryAcquireShared(arg); // 获取成功后退出自旋 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上面代码中,当获取同步状态失败后,则创建一个共享模式类型的节点,然后自旋式获取同步状态,如果前驱节点为首节点时则尝试再次获取同步状态,获取同步状态成功后退出当前自旋。
共享式释放同步状态
releaseShared(int arg)
方法来释放共享式同步状态。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 同步状态释放成功后,唤醒后面等待状态的节点 doReleaseShared(); return true; } return false; }
上面tryReleaseShared(int arg)
释放同步状态方法必须保证线程安全,因为它多个线程获取到同步状态时会引发并发操作,可以通过循环操作和 CAS 来确保安前行。
doReleaseShared()
方法唤醒后续等待状态的节点。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 验证后继节点的线程处于等待状态 if (ws == Node.SIGNAL) { // 再次检查后继节点的线程是否处于等待状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒后继节点,这时每唤醒一次就更新一次首节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
共享同步状态释放后,自旋式依次唤醒队列中节点。
总结
从 AQS 中可以借鉴它利用循环和 CAS 来确保并发的安全性的思路,同时它采用模板设计模式定义一个处理逻辑,将具体的特定处理逻辑交由子类自定义实现。在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 以及 Tomncat 的 LimitLatch 都有用其作为同步器。
推荐阅读
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
探索现代的移动网络
作者:盒子里的薛定谔(Darren Chen),iOS 工程师,目前就职于百度 App 移动技术平台组 Sessions: https://developer.apple.com/videos/play/wwdc2020/10111/ https://developer.apple.com/videos/play/wwdc2020/10047/ https://developer.apple.com/video你可以为系统级 DNS 设置根据不同的接入网设置不同的规则。s/play/wwdc2020/10110/ https://developer.apple.com/videos/play/wwdc2020/10113/ IPv6 IP 协议第 6 版(英语:Internet Protocol version 6,缩写:IPv6)是 IP 协议的最新版本,用作互联网的协议。用它来取代 IPv4 主要是为了解决 IPv4 地址枯竭问题,同时它也在其他方面对于 IPv4 有许多改进。 IPv6 的设计目的是取代 IPv4,然而长期以来 IPv4 在互联网流量中仍占据主要地位,IPv6 的...
- 下一篇
机器学习基础:可视化方式理解决策树剪枝
↑↑↑点击上方蓝字,回复资料,10个G的惊喜 看了一些市面上的经典教材,感觉决策树剪枝这一部分讲的都特别晦涩,很不好理解。本文以理论白话+具体案例的形式来讲清楚这个重要知识点,打好决策树这个基础,有助于理解之后我们要讲解的随机森林、gbdt、xgboost、lightgbm等模型。 阅读本文前,可以顺便回顾一下前文:机器学习基础:决策树的可视化 剪枝 如果不对决策树设置任何限制,它可以生成一颗非常庞大的树,决策树的树叶节点所覆盖的训练样本都是“纯”的。这样决策树在训练样本上非常精准,但是在测试集上就没那么好了。层数越多,叶结点越多,分的越细致,对训练数据分的也越深,越容易过拟合,导致对测试数据预测时反而效果差。要解决这个问题就需要对决策树进行「剪枝」。 剪枝的方案主流的有两种,一种是预剪枝,一种是后剪枝。 所谓的预剪枝,即是在生成树的时候就对树的生长进行限制,防止过度拟合。比如我们可以限制决策树在训练的时候每个节点的数据只有在达到一定数量的情况下才会进行分裂,否则就成为叶子节点保留。或者我们可以限制数据的比例,当节点中某个类别的占比超过阈值的时候,也可以停止生长。 下面我们重点讲后剪枝...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS关闭SELinux安全模块
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Hadoop3单机部署,实现最简伪集群
- Docker使用Oracle官方镜像安装(12C,18C,19C)