此处给出AQS的一些先行知识点,为后续详细解析AQS做铺垫。在给出一些必要的基础之后,我们先从分析AQS一些具体典型锁实现,最后对AQS做归纳总结。
AQS(AbstractQueuedSynchronizer)是多线程同步器,它是JUC(java.util.concurrent)包中多个组件的底层实现,比如像Lock、CountDownLatch、Semaphore等都是用到了AQS。简单理解就是:AQS定义了模板,具体实现由各个子类完成。
AQS提供2种锁机制,独占锁和共享锁。独占锁,就是存在多个线程去竞争同一共享资源的时候,同一个时刻,只允许一个线程去访问该共享资源,也就是说只能有一个线程获取该共享资源的锁。比如Lock中的ReentrantLock重入锁,它的实现就是用到了AQS的一个排它锁的功能。
共享锁也称为读锁,也就是同一时刻,允许多个线程获取锁的资源,比如CountDownLatch、Semaphore,都用到了AQS中的共享锁的功能。
关于锁的的公平性和非公平行,AQS的处理方法是,在竞争锁资源的时候,公平锁要判断双向链表中是否有阻塞的线程,如果有则需要去排队等待。而非公平锁的处理方式是,不管双向链表中是否有阻塞的线程在排队等待,它都会去尝试去修改state变量去竞争锁,这个过程是非公平的
AQS对于锁竞争通过维护一个双向链表实现的队列来完成,其第一个节点为哨兵节点。AQS相关代码如下:
private transient volatile Node head;private transient volatile Node tail;private volatile int state;
内部链表节点类部分代码如下下:
static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;static final int PROPAGATE = -3;volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;// 省略其他相关代码
}
AQS支持多个条件变量,条件变量阻塞队列为单链表,内部条件变量类部分代码如下:
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;public final void signal() {//省略其他代码}public final void await() throws InterruptedException {//省略其他代码}// 省略其他代码
}
解析
说明:阻塞在条件变量上的线程被唤醒后,它还是要去竞争锁的,即会加入锁竞争的队列。
ReentrantLockUML如下图2.1所示:
ReentrantLock为一种可重入,可打断的独占式锁。默认实现为非公平锁,默认构造方法如下:
public ReentrantLock() {sync = new NonfairSync();
}
关于可重入,可打断和公平锁的实现原理我们会在后面分析,下面优先讲解锁的主要功能加锁和解锁。
当前线程获取锁有2种结果,成功或者失败。如下所示ReentrantLock的加锁源代码:
public void lock() {sync.lock();
}
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}
通过cas的方式把state由0设置为1,成功,获取锁成功,把exclusiveOwnerThread独占线程标识设置为当前线程,执行相关操作。
竞争锁失败执行acquire(1)方法,源代码如下:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
tryAcquire()默认执行非公平锁的tryAcquire()方法,源代码如下:
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
执行流程如下:
执行流程图如下图 2.2.1-1所示:
源代码如下:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
该方法目标是把当前线程节点插入队列尾部,先执行判断尾节点是否为空
加入竞争锁队列后会根据情况尝试获取锁或者阻塞,源代码如下:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
执行流程如下:
源代码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true;if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
该方法的目标就是设置目标节点node的前驱节点pred的waitStatus为Node.SIGNAL,该状态意味着它有责任唤醒它的后记节点。
执行流程如下:
源代码如下:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}
该方法就是阻塞,之所以说默认是不可打断模式,就是这里。被打断之后,只是返回true且清除了打断标记。返回上传调用方法acquireQueued继续执行for循环,直至获得锁,返回打断标记true。
源代码如下:
static void selfInterrupt() {Thread.currentThread().interrupt();
}
执行该方法的前提是线程在阻塞的时候被打断且获取到锁,然后执行自我打断。
释放锁源代码如下:
public void unlock() {sync.release(1);
}
源码如下:
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
既然执行释放锁操作,说明当前线程已经获取锁。
执行流程如下:
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
在tryRelease()方法返回true之后,判断如果竞争锁队列头结点不为空且状态!=0(-1,之前加锁流程把前驱节点状态设置),会唤醒后继节点,返回true;
源代码如下:
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
}
执行流程如下:
解锁失败情况就是存在锁重入,未解锁到最后一层。
如有问题,欢迎交流讨论。
❓QQ:806797785
⭐️源代码仓库地址:https://gitee.com/gaogzhen/concurrent