上一篇:
1.示例代码
演示一个生产者消费者的场景,await阻塞,signal唤醒
细节点:
aqs里面锁是什么,简单来说就是state的值,state>0线程持有了锁,state=0线程释放了锁。
aqs里面线程是什么,简单来说就是Node节点,Node节点通过构成链表结构来代表线程的执行的先后顺序,这个链表有双向的,有单向的,其属性prev和next以及全局head,tail属性用于构建双向链表,其nextWaiter以及firstWaiter,lastWaiter构成单向链表,waitStatus的属性用于表示当前线程的等待状态;
调用await的方法的线程,必然持有锁;
生产者put和消费者take的方法使用同一个lock,存在资源竞争;
await方法会阻塞当前线程,signal会唤醒对应的处于条件队列中等待的线程,并将处于条件队列中的的节点移动到双向同步队列里面;
有了这几个概念,有助于理解下面所说的。
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putIndex, takeIndex, count;
public void put(Object item) throws InterruptedException {
//这里会将state值修改,state值在aqs中是全局的,不管同步队列还是条件队列都共享一个state
lock.lock();
try {
//items满了,阻塞
while (count == items.length) {
//调用await方法的线程必须持有锁
//思考:这里调用了await方法是否应该释放当前线程的锁呢?因为全局共用一个state值。
notFull.await();
}
items[putIndex] = item;
if (++putIndex == items.length) {
putIndex = 0;
}
++count;
notEmpty.signal();//items中已经有值,通知notEmpty队列消费
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
//items为空
while (count == 0) {
//调用await方法的线程必须持有锁
//思考:这里调用了await方法是否应该释放当前线程的锁呢?
notEmpty.await();
}
Object item = items[takeIndex];
if (++takeIndex == items.length) {
takeIndex = 0;
}
--count;
notFull.signal();//items中已经被消费掉,通知notFull生产
return item;
} finally {
lock.unlock();
}
}
2.双向同步队列和Condition条件队列
2.1.双向同步队列结构
这是lock加锁和unLock释放锁过程中形成的队列,有一个虚的头节点,虚节点不存储线程等相关信息,nextWaiter属性在这种队列中无实际作用。通过prev和next属性串联节点,thread保存当前线程,waitStatus表示当前线程的等待状态。
![]()
2.2.Condition条件队列结构
由示例代码可以看出,每个Condition对象都对应了一个Condition条件队列,而且每个条件队列是相互独立的。
Condition条件队列是一个单向的队列,而且firstWaiter也有实际作用,彼此之前是通过nextWaiter关联的,在Condition条件队列中,prev和next属性也并不会用到。
当当前线程调用了notFull.await()方法的时候,当前线程就会被包装成Node被加入到notFull队列的末尾。
在条件队列中,我们需要关注的是waitStatus的属性为CONDITION,当waitStatus=CONDITION的时候,我们就认为当前线程不需要等待,可以出队了。
![]()
2.3.双向同步队列和条件队列的联系
一般情况下,双向同步队列和条件队列是相互独立的。但是,当我们调用条件队列的signal方法的时候,会将条件队列中的处于等待的线程唤醒,被唤醒的线程同样和普通线程一样要去争抢锁资源。如果争抢失败,同样要被加到双向同步队列中去。这个时候,就需要将条件队列中的节点一个个转移到双向同步队列中去了。注意,这个节点从条件队列迁移到双向同步队列是一个一个迁移的。
![]()
3.分析ConditionObject
final Condition notFull = lock.newCondition();
该类位于AbstractQueuedSynchronized里面
![]()
![]()
核心属性:
//条件队列首节点
private transient Node firstWaiter;
//条件队列尾节点
private transient Node lastWaiter;
构造方法:
public ConditionObject() { }
3.1.分析ConditionObject.await()方法
await方法执行过程:
(1)lock加锁;
(2)await方法检测中断状态;
(3)fullyRelease方法释放锁;
(4)节点如果不位于同步队列,挂起线程,等待被唤醒;
(5)节点被signal唤醒或者中断唤醒;
(6)checkInterruptWhileWaiting检查中断模式,根据节点位于同步队列还是条件队列设置中断模式,并将节点加入到同步队列中;
(7)acquireQueued尝试加锁,加锁失败在同步队列中等待被唤醒,挂起线程;
(8)加锁成功持有锁,reportInterruptAfterWait处理中断;
![]()
能够调用到await方法的线程都已经获取到锁。
调用await方法这里有两种情况:
1.中断发生时,线程还没有被signal过,则线程恢复后,抛出异常;
2.中断发生时,线程已经被signal过,那么线程已经被从condition队列中移动到同步队列中了,那么await方法调用之后,
只是再补一下中断,也就仅仅改变了线程的中断状态。
从await方法我们可以看出来,这个中断在这里仅仅只判断中断状态。我们直到thread.interrupt()方法只改变线程的中断状态为true而并不能真正意义上的停止线程,再配上LockSupport.unpark唤醒线程,就可以使得await方法抛出异常。
await方法退出的时候根据interruptMode来处理这两种情况:
先看interruptMode属性值:
//如果是默认值0,代表线程整个过程中没有发生中断
//=1表示退出await方法时再自我中断,这种模式发生在signal方法调用之后,因为当前节点被加入到了同步队列
//所以还需要中断
private static final int REINTERRUPT = 1;
//=-1说明退出await方法的时候抛出InterruptedException
private static final int THROW_IE = -1;
public final void await() throws InterruptedException {
//判断当前线程是否是中断状态,如果是直接抛出异常
//await方法中发现了线程中断,则抛出异常,后续reportInterruptAfterWait方法也会处理中断模式
if (Thread.interrupted())
throw new InterruptedException();
//新建一个节点并加入到条件队列中
Node node = addConditionWaiter();
//完全释放当前线程的锁,等待其他线程争抢锁
long savedState = fullyRelease(node);
int interruptMode = 0;//中断属性默认值,代表不需要中断
//判断节点是否处于同步队列中,返回false则表示在条件队列中
//在调用signal方法之前,新加的节点都是位于条件队列中
//调用signal方法会将处于条件队列中的节点迁移到同步队列中
while (!isOnSyncQueue(node)) {
//节点位于条件队列则挂起当前线程,等待signal方法唤醒,线程当前被挂起在这个位置
LockSupport.park(this);
//线程如何被唤醒:
//1.线程被signal唤醒后,立刻走到这里
//3.被其他线程中断+唤醒,thread.interrupt() + LockSupport.unpark()方法;
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//能执行到这里,说明node已经被迁移到同步队列中了,而且当前线程也被唤醒了
//条件1:尝试加锁
//条件2:中断属性不为-1
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//尝试加锁成功或中断属性不为-1,则设置属性为1,代表需要自我中断
interruptMode = REINTERRUPT;
//node被加入到同步队列中的时候,并没有设置nextWaiter=null
//如果有条件队列中还有其他节点,清除条件队列中取消状态的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//根据中断模式处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
3.1.1.分析ConditionObject.addConditionWaiter方法
//创建一个新节点并将它加入到条件队列中,并返回该节点
//执行过程中剔除了已取消的节点
private Node addConditionWaiter() {
//获取条件队列最后一个节点
Node t = lastWaiter;
//条件1:如果t!=null说明条件队列中已经有Node节点了
//条件2:t.waitStatus != Node.CONDITION成立,说明当前尾节点取消了等待
//那么我们新加的节点就不应该在当前尾节点的后面了
if (t != null && t.waitStatus != Node.CONDITION) {
//剔除所有取消状态的节点
unlinkCancelledWaiters();
//更新临时变量的值,可能因为上一个方法改变尾节点
t = lastWaiter;
}
//创建一个新节点并设置waitStatus=CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果没有最终节点
if (t == null)
//设置第一个节点为当前节点
firstWaiter = node;
else
//如果有最终节点,设置当前节点的后续节点为新创建的节点
t.nextWaiter = node;
//更新最终节点为当前节点
lastWaiter = node;
return node;
}
3.1.2.分析unlinkCancelledWaiters方法
//剔除取消等待的节点
private void unlinkCancelledWaiters() {
//获取首节点
Node t = firstWaiter;
Node trail = null;
//循环遍历链表
while (t != null) {
//获取下一个节点
Node next = t.nextWaiter;
//条件成立,说明当前节点取消了等待
if (t.waitStatus != Node.CONDITION) {
//将首节点的的nextWaiter设置为空,断链操作,帮助gc
t.nextWaiter = null;
//如果链表还没有找到正常状态的节点
if (trail == null)
//将next节点设置为首节点
firstWaiter = next;
else
//让上一个正常节点指向取消等待节点的下一个节点
trail.nextWaiter = next;
//如果不存在下一个节点,当前尾节点设置为正常节点
if (next == null)
lastWaiter = trail;
}
//条件不成立说明当前t节点是正常节点
else
//给正常节点赋值,这里就找出了所有的正常节点
trail = t;
//循环继续向下遍历
t = next;
}
}
3.1.3.分析ConditionObject.fullRelease方法
//完全释放锁,当failed=true的时候,说明当前线程未持有锁调用了await方法
//调用await方法必须持有锁
//假如failed=true,finally中会将加入到队列中的节点状态设置为取消状态
//后续节点入队的过程会调用unlinkCancelledWaiters方法清除取消状态的节点
//释放锁成功后返回当前锁的state的值
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取当前线程state值的总值
int savedState = getState();
//直接调用release方法释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
3.1.4.分析ConditionObject.isOnSyncQueue方法
判断节点是否处于同步队列中,前面已经说过,当调用signal方法的时候,可能会将条件队列中的节点迁移到同步队列中。
final boolean isOnSyncQueue(Node node) {
//当前节点的状态为CONDITION,说明处于条件队列中
//当前节点的前驱节点为空,说明处于条件队列中,因为双向同步队列存在虚的首节点
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//当前节点的后续节点不为空,说明处于同步队列中,因为条件队列的prev和next属性为空
if (node.next != null)
return true;
//如果上面条件都不成立,说明当前节点满足成为同步队列节点的属性,即prev和next属性不为空
//循环链表从后向前遍历,看是否能找到当前节点,找到则返回true
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
3.1.5.分析checkInterruptWhileWaiting方法
该方法返回线程的中断的模式
private int checkInterruptWhileWaiting(Node node) {
//Thread.interrupted返回当前线程的中断状态并将中断状态清除
//被唤醒的线程当前中断状态还是中断,需要清除
//之后调用transferAfterCancelledWait方法判断节点是由signal唤醒还是中断唤醒
//位于condition队列说明是中断唤醒,然后中断模式返回THROW_IE
//位于同步队列则说明是signal唤醒,返回中断模式REINTERRUPT
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
3.1.6.分析transferAfterCancelledWait方法
该方法可以判断当前节点是否位于条件队列,位于条件队列,则需要将节点加入到同步队列,并返回true;
如果位于同步队列则返回false;
final boolean transferAfterCancelledWait(Node node) {
//条件成立,说明当前node的waitStatus值为CONDITION,当前node位于条件队列中
//为什么会出现这种情况,被signal方法唤醒后的线程不是在同步队列中了吗?
//如果我们是通过中断唤醒,那么这里还是需要将
//节点加入到同步队列中的
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//中断唤醒的node也会被加到阻塞队列中
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
3.1.7.分析reportInterruptAfterWait方法
根据中断模式处理中断
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();//线程自我中断,仅仅改变了中断状态
}
3.2.分析ConditionObject.signal方法
public final void signal() {
//是否独占锁线程,不是则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//条件队列有node节点,则进行条件队列迁移到同步队列的操作。
if (first != null)
doSignal(first);
}
//将条件队列中所有节点迁移到同步队列中
private void doSignal(Node first) {
do {
//firstWaiter = first.nextWaiter,处理完当前节点,则让firstWaiter指向下一个节点
if ( (firstWaiter = first.nextWaiter) == null)
//条件队列为空,则更新lastWaiter为空
lastWaiter = null;
//如果下一个节点为空,则出while循环
first.nextWaiter = null;
//while循环迁移某个节点成功和条件队列为空才会退出循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//通过CAS操作修改当前条件队列中节点状态为0,因为节点要被迁移到同步队列了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将节点加入到同步队列尾节点并返回其前一个节点
Node p = enq(node);
int ws = p.waitStatus;
//在同步队列中,waitStatus只有1,0,-1三种状态,大于0说明前驱节点是取消状态
//如果前驱节点是取消状态,则唤醒当前线程
//如果前驱节点不是取消状态,则设置前驱节点状态为-1,代表需要唤醒后续节点
//如果compareAndSetWaitStatus(p, ws, Node.SIGNAL)返回false,说明当前node是
//lockInterrupt入队的node,是会响应中断的,如果外部线程中断该node,前驱node会
//将节点状态修改为取消状态,并执行出队逻辑
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}