AQS源码分析之ConditionObject
创始人
2024-05-24 04:53:55
0

前面写了一篇 AQS 的源码分析,但是对于 ConditionObject 并没有分析,这篇文章对其做一个补充。

AQS原理分析_小鲁蛋儿的博客-CSDN博客

1、简介

ConditionObject是AQS中定义的内部类,实现了Condition接口,在其内部通过链表来维护等待队列(条件队列)。Contidion必须在lock的同步控制块中使用,调用Condition的signal方法并不代表线程可以马上执行,signal方法的作用是将线程所在的节点从等待队列中移除,然后加入到同步队列中,线程的执行始终都需要根据同步状态(即线程是否占有锁)。

每个条件变量都会有两个方法,唤醒和等待。当条件满足时,我们就会通过唤醒方法将条件容器内的线程放入同步队列中;如果不满足条件,我们就会通过等待方法将线程阻塞然后放入条件队列中。

2、Condition接口


public interface Condition {/** * 暂停此线程直至一下四种情况发生* 1.此Condition被signal()* 2.此Condition被signalAll()* 3.Thread.interrupt()* 4.虚假唤醒* 以上情况.在能恢复方法执行时,当前线程必须要能获得锁*/void await() throws InterruptedException;//跟上面类似,不过不响应中断void awaitUninterruptibly();//带超时时间的await(),并响应中断long awaitNanos(long nanosTimeout) throws InterruptedException;//带超时时间的await()boolean await(long time, TimeUnit unit) throws InterruptedException;//带deadline的await()boolean awaitUntil(Date deadline) throws InterruptedException;//唤醒某个等待在此condition的线程void signal();//唤醒有等待此condition的所有线程void signalAll();
}
  1. ConditionObject

我们先看下ConditionObject中的属性

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

每个条件变量都维护了一个容器,ConditionObject中的容器就是单向链表队列,上面的属性就是队列的头结点firstWaiter和尾结点lastWaiter,需要注意,条件队列中的头结点不是虚拟头结点,而是包装了等待线程的节点!其类型和同步队列一样,也是使用AQS的内部类Node来构成,但与同步队列不同的是,条件队列是一个单向链表,所以他并没有使用Node类中的next属性来关联后继Node,而使用的nextWaiter

volatile Node prev;
volatile Node next;
Node nextWaiter;

这里我们需要注意,nextWaiter是没用volatile修饰的,为什么呢?因为线程在调用await方法进入条件队列时,是已经拥有了锁的,此时是不存在竞争的情况,所以无需通过volatile和cas来保证线程安全。而进入同步队列的都是抢锁失败的,所以肯定是没有锁的,故要考虑线程安全

最后需要注意一点的是,条件队列里面的Node只会存在CANCELLED和CONDITION的状态

3.1 signalAll()

将条件队列中的所有Node移到同步队列中,然后根据条件再唤醒它们去尝试获得锁

public final void signalAll() {// 查看此时的线程是否已经获得了锁if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;// 如果条件队列没有阻塞的 Nodeif (first != null)// 将条件队列中的所有 Node 移动到同步队列中doSignalAll(first);
}

3.1.1 doSignalAll()

将条件队列中的所有 Node 移动到同步队列中

private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;// 通过循环将条件队列中每个node移动到同步队列do {Node next = first.nextWaiter;first.nextWaiter = null;// 将此node转移到同步队列中transferForSignal(first);first = next;} while (first != null);
}

3.1.2 transferForSignal()

将node转移到同步队列中

final boolean transferForSignal(Node node) {// 说明此节点状态为CANCELLED,所以跳过该节点(GC会回收)if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 将 node 加入队尾,enq 返回的是 node 的前驱节点Node p = enq(node);int ws = p.waitStatus;// 1. 前驱节点状态是CANCELLED// 2. 前驱节点不是CANCELLED状态,但CAS将状态变为SIGNAL失败// 此时会唤醒线程,去尝试获取锁,此时会来到await()方法中的acquireQueued()方法,// acquireQueued()方法中的shouldParkAfterFailedAcquire()方法会// 往前找,直到找到最近一个正常等待的状态,将该节点排在它的后边if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread); return true;
}

3.2 signal()

signal则只转移条件队列中的第一个状态不为CANNCELLED的Node

public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}private void doSignal(Node first) {do {// 将firstWaiter指向传入的first的后继节点,为null说明条件队列中只有first这一个节点,将整个队列清空if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// transferForSignal如果返回为false,说明节点进入同步队列失败(已经被取消了),// 判断此节点的下一个节点是否为null,如果不为null,则会再次进入循环将这个节点进行入队} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}

3.3 await()阻塞前

其中LockSupport.park(this)进行阻塞当前线程,后续唤醒,也会在这个程序点恢复执行。

public final void await() throws InterruptedException {// 如果当前线程被中断,抛出异常if (Thread.interrupted())throw new InterruptedException();// 将调用 await 的线程包装成 Node,添加到条件队列并返回Node node = addConditionWaiter();// 完全释放节点持有的锁,因为其他线程唤醒当前线程的前提是【持有锁】int savedState = fullyRelease(node);// 设置打断模式为没有被打断int interruptMode = 0;// 如果不在同步队列中while (!isOnSyncQueue(node)) {// 此线程就被 park 方法阻塞了,只有当线程被唤醒才会在这里开始继续执行下面代码LockSupport.park(this);/*  await()方法阻塞前的代码到这里结束  */if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

3.3.1 addConditionWaiter()

添加一个等待Node到条件队列中

private Node addConditionWaiter() {Node t = lastWaiter;// 如果最后一个等待节点的状态不是 CONDITION,说明已经被取消了,进行清理if (t != null && t.waitStatus != Node.CONDITION) {// 清理条件队列中的不是CONDITION 类型的节点unlinkCancelledWaiters();t = lastWaiter;}//新建一个Condition状态的节点,并将其加在尾部Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}

3.3.2 unlinkCancelledWaiters()

清理条件队列中的不是Condition 类型的节点,比如中断、超时等会导致节点转换为Cancel。

// 链表删除的逻辑
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;// 判断 t 节点是否为 CONDITION 节点,不是 CONDITION 就不是正常的if (t.waitStatus != Node.CONDITION) { // 不是正常节点,需要 t 与下一个节点断开t.nextWaiter = null;// 说明遍历到的节点还未碰到过正常节点if (trail == null)// 更新 firstWaiter 指针为下个节点firstWaiter = next;else// 删除非正常的节点trail.nextWaiter = next;// 尾节点是异常节点,更新 lastWaiter 指向 trailif (next == null)lastWaiter = trail;} else {// trail 指向的是正常节点 trail = t;}t = next; }
}

3.3.3 fullyRelease()

将当前线程持有的锁释放掉

final int fullyRelease(Node node) {// 释放锁是否成功,false 代表成功boolean failed = true;try {int savedState = getState();// 通过 release 方法释放锁 if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {// 没有释放成功,将当前 node 设置为取消状态if (failed)node.waitStatus = Node.CANCELLED;}
}

3.3.4 isOnSyncQueue()

判断节点是否在AQS同步队列中

final boolean isOnSyncQueue(Node node) {// node 的状态是 CONDITION,或者前驱节点为空,说明此节点是在条件队列中if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 说明当前节点已经成功入队到同步队列,且当前节点后面已经有其它 nodeif (node.next != null)return true;// 说明【可能在同步队列,但是是尾节点】// 从同步队列的尾节点开始向前【遍历查找 node】,如果查找到返回 true,查找不到返回 falsereturn findNodeFromTail(node);
}

3.3.5 findNodeFromTail()

从同步队列的尾节点,向前遍历查找node

    private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}

3.4 await() 阻塞后

public final void await() throws InterruptedException {// 省略。。。// 如果不在同步队列中while (!isOnSyncQueue(node)) {// 此线程就被 park 方法阻塞了,只有当线程被唤醒才会在这里开始继续执行下面代码LockSupport.park(this); // <----- 被唤醒后从下面开始// 两种被唤醒的方式:// 1. 其他线程调用了doSignal或doSignalAll// 2. 线程被中断if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

3.4.1 checkInterruptWhileWaiting()

检查等待的时间内是否被中断过

  • 没有中断则返回0

  • 发生中断,我们需要通过transferAfterCancelledWait方法进一步检查其他线程是否执行了唤醒操作

  • 中断先于其他线程调用signal等方法唤醒的,返回THROW_IE,await方法退出时,会抛出InterruptedException异常

  • 中断是后于其他线程调用signal等方法唤醒,返回REINTERRUPT,await方法退出时,会重新再中断一次

/** await方法退出时,会重新再中断一次 */
private static final int REINTERRUPT =  1;
/** await方法退出时,会抛出InterruptedException异常 */
private static final int THROW_IE    = -1;
private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}

3.4.2 transferAfterCancelledWait()

final boolean transferAfterCancelledWait(Node node) {if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node);return true;}while (!isOnSyncQueue(node))Thread.yield();return false;
}

如果条件中的CAS操作成功,说明此时的Node肯定是在条件队列中,则我们调动 enq 方法将此节点放入到同步队列中,然后返回true,但是这里需要特别注意,此时说明中断先于其他线程调用signal等方法唤醒的,这个节点的nextWaiter还没置为null

如果CAS失败了,说明这个节点可能已经在同步队列中或者在入队的过程中,说明中断是后于其他线程调用signal等方法唤醒,所以我们通过while循环等待此节点入队后返回false

然后我们返回到调用checkInterruptWhileWaiting方法的await方法中

public final void await() throws InterruptedException {// 省略。。。// 如果不在同步队列中while (!isOnSyncQueue(node)) {// 此线程就被 park 方法阻塞了,只有当线程被唤醒才会在这里开始继续执行下面代码LockSupport.park(this); // <----- 被唤醒后从下面开始// 两种被唤醒的方式:// 1. 其他线程调用了doSignal或doSignalAll// 2. 线程被中断if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // <-- 此时在这里break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

我们可以看到,如果返回值不为0,则直接break跳出循环,如果为0,则再次回到while条件是否检查是否在同步队列中。

3.4.3 acquireQueued()

这个方法首先会检查下节点是否在同步队列第一个,如果在,则会再次尝试获取锁,成功后则会返回true,如果不在同步队列第一个或者获取锁失败了,则会去挂起,然后等待前驱结点释放锁后再被唤醒。如果在刚刚这个过程中,线程又被中断了,则interrupted则会置为true,然后最终方法返回为true

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

回到await方法处看if条件

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;

如果在获取锁的的过程中被中断了,即acquireQueued返回true,我们再将interruptMode为0置为REINTERRUPT。其实简单来说,第一个if语句就是让节点去获取锁,并且如果在获取锁的过程中被中断了,且此线程之前没被中断过,则将interruptMode置为REINTERRUPT。

我们再来看第二个if语句

if (node.nextWaiter != null) // cancelAcquire 中将中断节点设置为 CANCELLEDunlinkCancelledWaiters();

当线程是被中断唤醒时,node和后继节点是没有断开的,这一步我们的节点中的线程已经获取锁了且从同步队列中移除了,所以我们在这里将此节点也移除条件队列,unlinkCancelledWaiters方法前面说过,它会将条件队列中所有不为CONDITION的的节点移除。

最后一个if语句了,到这里,线程也拿到锁了,包装线程的节点也没在同步队列和条件队列中了,所以wait方法其实已经完成了,现在需要对中断进行善后处理了。

if (interruptMode != 0)reportInterruptAfterWait(interruptMode);

3.4.4 reportInterruptAfterWait()

private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();
}

如果是THROW_IE,就是抛异常,如果是REINTERRUPT,就再自我中断一次

参考文章:【Java并发编程】AQS(5)——ConditionObject_24只羊羊羊的博客-CSDN博客

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...