从源码看AQS

AQS实际上是操作以Node为元素的队列,Node包含了所属线程,先以不公平锁分析:lock时先尝试获取锁,获取失败则进入队列且被阻塞(期间可以被打断)等待,当锁被释放的时候,如果队列不为空,则唤醒头节点的下一个节点NEXT,如果此时被新线程NT拿到锁,则NEXT继续进入阻塞等待,当NT释放锁时,头节点的下一个节点还是NEXT再次被唤醒,如果此时NEXT获得锁,则将NEXT设置为头节点,这就是一个大概的流程,顺着这个思路,再来一步步看实现代码。

AQS->AbstractQueuedSynchronizer,看名字 抽象队列同步器,首先得有队列。

先来看队列节点类,Node:

static final class Node {
        
        static final Node SHARED = new Node(); //共享模式标记
        static final Node EXCLUSIVE = null;//独占模式标记
        
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;//取值为 上面的4个int,初始化为0

        volatile Node prev;//前一个节点

        volatile Node next;//下一个节点

        volatile Thread thread;

        Node nextWaiter;//下一个等待节点  SHARED,EXCLUSIVE 
        
        //如果节点在共享模式下等待,则返回true
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        //返回前一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {
        }
        //以下是Node的两种不同的使用方式
        Node(Thread thread, Node mode) {  
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

注意上面两种不同的节点使用方式,要构建队列那么还得有 队列的首尾表示节点,

    //队列头节点,除开初始化,只能通过setHead方法设置    
    private transient volatile Node head;
    //队列尾节点,只能通过enq方法设置
    private transient volatile Node tail;
   //同步状态
    private volatile int state; //初始状态为0

那么具体的实现或者用法,我们还是通过举例来说明:

ReentrantLock-可重入锁,下面以RL代表 ,以AQS代表AbstractQueuedSynchronizer。

在RL中有个静态内部类,Sync  是AbstractQueuedSynchronizer的抽象子类。在Sync的基础上继续扩展出两个实现子类 NonfairSync(非公平)和FairSync(公平)。而RL实现锁机制就是建立在 Sync的基础之上的。

先说说NonfairSync(以nfs简称),非公平锁

我们使用RL.lock()获取锁,实际调用是nfs的lock方法

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

compareAndSetState就是使用CAS乐观锁来修改AQS的state值,由0 改为1,有疑问可以参考UnSafe类实现CAS乐观锁,然后将当前线程设置为锁的占有线程。acquire方法是由AQS实现的,

//尝试获取锁失败,则进入队列
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果在acquireQueued期间线程标记中断,那么将进入线程中断selfInterrupt。
            selfInterrupt();
    }

tryAcquire是由子类来实现的,这里也就是由nfs实现,主要是再次尝试获取锁(可重入锁),来看具体实现:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { //如果state为0 , 则尝试获取锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果锁已经被占用,但是占有者是当前线程,那么将state + 1,即重入锁,最大值为Integer.MAX_VALUE
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) 
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;//否则返回false
        }

第二个条件 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),EXCLUSIVE独占锁标志,两个方法都是AQS实现的:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        //先判断队列的尾节点是否为空,此处是一个快速尝试,失败了就继续走enq方法
        Node pred = tail;
        if (pred != null) {//尝试将尾节点设置为node,设置成功则返回
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//将node插入队列尾部,然后返回,也是由CAS实现,注意 enq会初始化一个空的Node(无参构造函数)作为head节点
        return node;
    }
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //进入循环
            for (;;) {
                final Node p = node.predecessor();//获取node的上一个节点p
                if (p == head && tryAcquire(arg)) {//如果p是头节点,而且当前线程拿到了锁,那么就将node设为头节点,注意这里可能会有最新来的线程和p节点所属的线程进行竞争,竞争失败则继续进入阻塞等待,当锁被释放时,又会唤醒头节点的下一个节点
                    setHead(node);
                    p.next = null; // 方便回收
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  //线程会阻塞,等待释放,直到prev节点完全释放锁的时候会被唤醒,然后开始再次循环。shouldParkAfterFailedAcquire 主要是修改node的waitStatus。parkAndCheckInterrupt 主要阻塞当前线程,期间线程可能会被中断,当线程被唤醒之后,再判断是否被中断。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node); //取消获得的锁
        }
    }
//因为是普通节点,所以waitStatus的初始值为0,忘了的话可以回到上面看下Node的两种构造函数
//所以这里主要由两步,第一步尝试将pred的 waitStatus 改为SIGNAL,第二次再进来就直接返回true了
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

成功获取锁之后才能释放锁:

//AQS实现
public final boolean release(int arg) {
        if (tryRelease(arg)) { //由子类实现,包括重入锁持有,直到所有锁全部释放
            Node h = head;
            if (h != null && h.waitStatus != 0) //如果head节点的等待状态不为0,在获取锁失败的时候,node节点会将prev的waitstatus改为SIGNAL,也就是p后面还有节点, 具体是在 shouldParkAfterFailedAcquire方法
                unparkSuccessor(h);//唤醒h的下一个节点
            return true;//完全释放锁
        }
        return false;
    }
//nfs实现:
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断当前线程是否是锁的持有线程
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { //如果c == 0表示当前线程获得的重入锁已经全部释放,则修改锁的持有线程为null
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);//否则只是将state - 1 
            return free;
        }

从上面加锁、释放锁的步骤可以看出,不公平锁不公平的地方在于,当head节点完全释放锁的时候,这个时候最新来的线程和head节点唤醒的下一个节点会同时竞争锁。

通过对ReentrantLock 的使用分析,可能大家对AQS已经由了一个初步的概念,那么可重入锁的公平锁版本的具体实现大家有兴趣可以自行对照上面的流程跟着源码走一遍,下一篇我们再来看看ReentrantReadWriteLock读写锁的源码实现,来进一步的了解AQS。

优秀的个人博客,低调大师

微信关注我们

原文链接:https://my.oschina.net/qiantu/blog/4773481

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。