# 深入理解 AQS * [深入理解 AQS](#深入理解-aqs) * [前言](#前言) * [AQS实现原理](#aqs实现原理) * [目录结构](#目录结构) * [场景分析](#场景分析) * [线程一加锁成功](#线程一加锁成功) * [线程二抢占锁失败](#线程二抢占锁失败) * [线程三抢占锁失败](#线程三抢占锁失败) * [线程一释放锁](#线程一释放锁) * [线程二唤醒继续加锁](#线程二唤醒继续加锁) * [线程二释放锁/线程三加锁](#线程二释放锁线程三加锁) * [公平锁实现原理](#公平锁实现原理) * [Condition实现原理](#condition实现原理) * [Condition简介](#condition简介) * [Condition Demo示例](#condition-demo示例) * [Condition实现原理图解](#condition实现原理图解) * [Condition总结](#condition总结) * [总结](#总结) ### 前言 谈到并发,我们不得不说`AQS(AbstractQueuedSynchronizer)`,所谓的`AQS`即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的`ReentrantLock`、`ReentrantReadWriteLock`、`CountDownLatch`、`Semaphore`等都是基于`AQS`来实现的。 我们先看下`AQS`相关的`UML`图: ![image.png](https://i.loli.net/2020/04/30/bUJ62AvdlxRskny.png) 思维导图: ![image.png](https://i.loli.net/2020/04/30/rIu85mRqgXVxiZz.png) ### AQS实现原理 `AQS`中 维护了一个`volatile int state`(代表共享资源)和一个`FIFO`线程等待队列(多线程争用资源被阻塞时会进入此队列)。 这里`volatile`能够保证多线程下的可见性,当`state=1`则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个`FIFO`的等待队列中,比列会被`UNSAFE.park()`操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。 另外`state`的操作都是通过`CAS`来保证其并发修改的安全性。 具体原理我们可以用一张图来简单概括: ![image.png](https://i.loli.net/2020/04/29/ObTBC238jiFfxmZ.png) `AQS` 中提供了很多关于锁的实现方法, - getState():获取锁的标志state值 - setState():设置锁的标志state值 - tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false。 - tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false。 这里还有一些方法并没有列出来,接下来我们以`ReentrantLock`作为突破点通过源码和画图的形式一步步了解`AQS`内部实现原理。 ### 目录结构 文章准备模拟多线程竞争锁、释放锁的场景来进行分析`AQS`源码: **三个线程(线程一、线程二、线程三)同时来加锁/释放锁** **目录如下:** - **线程一**加锁成功时`AQS`内部实现 - **线程二/三**加锁失败时`AQS`中等待队列的数据模型 - **线程一**释放锁及**线程二**获取锁实现原理 - 通过线程场景来讲解**公平锁**具体实现原理 - 通过线程场景来讲解Condition中a`wait()`和`signal()`实现原理 这里会通过画图来分析每个线程加锁、释放锁后`AQS`内部的数据结构和实现原理 ### 场景分析 #### 线程一加锁成功 如果同时有**三个线程**并发抢占锁,此时**线程一**抢占锁成功,**线程二**和**线程三**抢占锁失败,具体执行流程如下: ![image.png](https://i.loli.net/2020/05/01/YUncw5SyCZNVipf.png) 此时`AQS`内部数据为: ![image.png](https://i.loli.net/2020/04/29/WyqKIB2gFvHmMCs.png) **线程二**、**线程三**加锁失败: ![image.png](https://i.loli.net/2020/04/29/Z2BFymNtgbsV9YI.png) 有图可以看出,等待队列中的节点`Node`是一个双向链表,这里`SIGNAL`是`Node`中`waitStatus`属性,`Node`中还有一个`nextWaiter`属性,这个并未在图中画出来,这个到后面`Condition`会具体讲解的。 具体看下抢占锁代码实现: `java.util.concurrent.locks.ReentrantLock .NonfairSync:` ```java static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } ``` 这里使用的**ReentrantLock非公平锁**,线程进来直接利用`CAS`尝试抢占锁,如果抢占成功`state`值回被改为1,且设置对象独占锁线程为当前线程。如下所示: ```java protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } ``` #### 线程二抢占锁失败 我们按照真实场景来分析,**线程一**抢占锁成功后,`state`变为1,**线程二**通过`CAS`修改`state`变量必然会失败。此时`AQS`中`FIFO`(First In First Out 先进先出)队列中数据如图所示: ![image.png](https://i.loli.net/2020/04/29/x4Ksd5PRl3tLFqT.png) 我们将**线程二**执行的逻辑一步步拆解来看: `java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()`: ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` 先看看`tryAcquire()`的具体实现: `java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire()`: ```java final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` `nonfairTryAcquire()`方法中首先会获取`state`的值,如果不为0则说明当前对象的锁已经被其他线程所占有,接着判断占有锁的线程是否为当前线程,如果是则累加`state`值,这就是可重入锁的具体实现,累加`state`值,释放锁的时候也要依次递减`state`值。 如果`state`为0,则执行`CAS`操作,尝试更新`state`值为1,如果更新成功则代表当前线程加锁成功。 以**线程二**为例,因为**线程一**已经将`state`修改为1,所以**线程二**通过`CAS`修改`state`的值不会成功。加锁失败。 **线程二**执行`tryAcquire()`后会返回false,接着执行`addWaiter(Node.EXCLUSIVE)`逻辑,将自己加入到一个`FIFO`等待队列中,代码实现如下: `java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter()`: ```java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } ``` 这段代码首先会创建一个和当前线程绑定的`Node`节点,`Node`为双向链表。此时等待对内中的`tail`指针为空,直接调用`enq(node)`方法将当前线程加入等待队列尾部: ```java private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` 第一遍循环时`tail`指针为空,进入if逻辑,使用`CAS`操作设置`head`指针,将`head`指向一个新创建的`Node`节点。此时`AQS`中数据: ![image.png](https://i.loli.net/2020/04/29/GXlTREfvuPoa2zm.png) 执行完成之后,`head`、`tail`、`t`都指向第一个`Node`元素。 接着执行第二遍循环,进入`else`逻辑,此时已经有了`head`节点,这里要操作的就是将**线程二**对应的`Node`节点挂到`head`节点后面。此时队列中就有了两个`Node`节点: ![image.png](https://i.loli.net/2020/04/29/RbJzKSrOscuPpd5.png) `addWaiter()`方法执行完后,会返回当前线程创建的节点信息。继续往后执行`acquireQueued(addWaiter(Node.EXCLUSIVE), arg)` 逻辑,此时传入的参数为**线程二**对应的`Node`节点信息: `java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued()`: ```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndChecknIterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } ``` `acquireQueued()`这个方法会先判断当前传入的`Node`对应的前置节点是否为`head`,如果是则尝试加锁。加锁成功过则将当前节点设置为`head`节点,然后空置之前的`head`节点,方便后续被垃圾回收掉。 如果加锁失败或者`Node`的前置节点不是`head`节点,就会通过`shouldParkAfterFailedAcquire`方法 将`head`节点的`waitStatus`变为了`SIGNAL=-1`,最后执行`parkAndChecknIterrupt`方法,调用`LockSupport.park()`挂起当前线程。 此时`AQS`中的数据如下图: ![image.png](https://i.loli.net/2020/04/29/aQAGnsLc6VPB1yp.png) 此时**线程二**就静静的待在`AQS`的等待队列里面了,等着其他线程释放锁来唤醒它。 #### 线程三抢占锁失败 看完了**线程二**抢占锁失败的分析,那么再来分析**线程三**抢占锁失败就很简单了,先看看`addWaiter(Node mode)`方法: ```java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } ``` 此时等待队列的`tail`节点指向**线程二**,进入`if`逻辑后,通过`CAS`指令将`tail`节点重新指向**线程三**。接着**线程三**调用`enq()`方法执行入队操作,和上面**线程二**执行方式是一致的,入队后会修改**线程二**对应的`Node`中的`waitStatus=SIGNAL`。最后**线程三**也会被挂起。此时等待队列的数据如图: ![image.png](https://i.loli.net/2020/04/29/Z2BFymNtgbsV9YI.png) #### 线程一释放锁 现在来分析下释放锁的过程,首先是**线程一**释放锁,释放锁后会唤醒`head`节点的后置节点,也就是我们现在的**线程二**,具体操作流程如下: ![image.png](https://i.loli.net/2020/05/01/aY1c2ZevXD4sSiT.png) 执行完后等待队列数据如下: ![image.png](https://i.loli.net/2020/04/29/zCt5kZIR2suhfLr.png) 此时**线程二**已经被唤醒,继续尝试获取锁,如果获取锁失败,则会继续被挂起。如果获取锁成功,则`AQS`中数据如图: ![image.png](https://i.loli.net/2020/05/01/53J8HkmVf21OYsc.png) 接着还是一步步拆解来看,先看看**线程一**释放锁的代码: `java.util.concurrent.locks.AbstractQueuedSynchronizer.release()` ```java public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` 这里首先会执行`tryRelease()`方法,这个方法具体实现在`ReentrantLock`中,如果`tryRelease`执行成功,则继续判断`head`节点的`waitStatus`是否为0,前面我们已经看到过,`head`的`waitStatue`为`SIGNAL(-1)`,这里就会执行`unparkSuccessor()`方法来唤醒`head`的后置节点,也就是我们上面图中**线程二**对应的`Node`节点。 此时看`ReentrantLock.tryRelease()`中的具体实现: ```java protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ``` 执行完`ReentrantLock.tryRelease()`后,`state`被设置成0,Lock对象的独占锁被设置为null。此时看下`AQS`中的数据: ![image.png](https://i.loli.net/2020/04/29/3M7pzOmYfRgyJd4.png) 接着执行`java.util.concurrent.locks.AbstractQueuedSynchronizer.unparkSuccessor()`方法,唤醒`head`的后置节点: ```java private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } ``` 这里主要是将`head`节点的`waitStatus`设置为0,然后解除`head`节点`next`的指向,使`head`节点空置,等待着被垃圾回收。 此时重新将`head`指针指向**线程二**对应的`Node`节点,且使用`LockSupport.unpark`方法来唤醒**线程二**。 被唤醒的**线程二**会接着尝试获取锁,用`CAS`指令修改`state`数据。 执行完成后可以查看`AQS`中数据: ![image.png](https://i.loli.net/2020/04/29/zCt5kZIR2suhfLr.png) 此时**线程二**被唤醒,**线程二**接着之前被`park`的地方继续执行,继续执行`acquireQueued()`方法。 #### 线程二唤醒继续加锁 ```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ``` 此时**线程二**被唤醒,继续执行`for`循环,判断**线程二**的前置节点是否为`head`,如果是则继续使用`tryAcquire()`方法来尝试获取锁,其实就是使用`CAS`操作来修改`state`值,如果修改成功则代表获取锁成功。接着将**线程二**设置为`head`节点,然后空置之前的`head`节点数据,被空置的节点数据等着被**垃圾回收**。 此时线程三获取锁成功,`AQS`中队列数据如下: ![image.png](https://i.loli.net/2020/04/29/Kkr8yhRHEFOMagX.png) 等待队列中的数据都等待着被垃圾回收。 #### 线程二释放锁/线程三加锁 当**线程二**释放锁时,会唤醒被挂起的**线程三**,流程和上面大致相同,被唤醒的**线程三**会再次尝试加锁,具体代码可以参考上面内容。具体流程图如下: ![image.png](https://i.loli.net/2020/05/01/VEMB27futsnTCkw.png) 此时`AQS`中队列数据如图: ![image.png](https://i.loli.net/2020/05/01/axYI1evgDcsod6C.png) ### 公平锁实现原理 上面所有的加锁场景都是基于**非公平锁**来实现的,**非公平锁**是`ReentrantLock`的默认实现,那我们接着来看一下**公平锁**的实现原理,这里先用一张图来解释**公平锁**和**非公平锁**的区别: **非公平锁**执行流程: ![image.png](https://i.loli.net/2020/05/01/dI7gvPyRKoaOFjU.png) 这里我们还是用之前的线程模型来举例子,当**线程二**释放锁的时候,唤醒被挂起的**线程三**,**线程三**执行`tryAcquire()`方法使用`CAS`操作来尝试修改`state`值,如果此时又来了一个**线程四**也来执行加锁操作,同样会执行`tryAcquire()`方法。 这种情况就会出现竞争,**线程四**如果获取锁成功,**线程三**仍然需要待在等待队列中被挂起。这就是所谓的**非公平锁**,**线程三**辛辛苦苦排队等到自己获取锁,却眼巴巴的看到**线程四**插队获取到了锁。 **公平锁**执行流程: ![image.png](https://i.loli.net/2020/05/01/48fDGWJl3EdO6RB.png) 公平锁在加锁的时候,会先判断`AQS`等待队列中是存在节点,如果存在节点则会直接入队等待,具体代码如下. 公平锁在获取锁是也是首先会执行`acquire()`方法,只不过公平锁单独实现了`tryAcquire()`方法: `#java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()`: ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` 这里会执行`ReentrantLock`中公平锁的`tryAcquire()`方法 `#java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire()`: ```java static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } ``` 这里会先判断`state`值,如果不为0且获取锁的线程不是当前线程,直接返回false代表获取锁失败,被加入等待队列。如果是当前线程则可重入获取锁。 如果`state=0`则代表此时没有线程持有锁,执行`hasQueuedPredecessors()`判断`AQS`等待队列中是否有元素存在,如果存在其他等待线程,那么自己也会加入到等待队列尾部,做到真正的先来后到,有序加锁。具体代码如下: `#java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors()`: ```java public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ``` 这段代码很有意思,返回`false`代表队列中没有节点或者仅有一个节点是当前线程创建的节点。返回`true`则代表队列中存在等待节点,当前线程需要入队等待。 ![image.png](https://i.loli.net/2020/04/30/K1kJPLZj8Y2fwXt.png) 先判断`head`是否等于`tail`,如果队列中只有一个`Node`节点,那么`head`会等于`tail`,接着判断`head`的后置节点,这里肯定会是`null`,如果此`Node`节点对应的线程和当前的线程是同一个线程,那么则会返回`false`,代表没有等待节点或者等待节点就是当前线程创建的`Node`节点。此时当前线程会尝试获取锁。 如果`head`和`tail`不相等,说明队列中有等待线程创建的节点,此时直接返回`true`,如果只有一个节点,而此节点的线程和当前线程不一致,也会返回`true` **非公平锁**和**公平锁**的区别: **非公平锁**性能高于**公平锁**性能。**非公平锁**可以减少`CPU`唤醒线程的开销,整体的吞吐效率会高点,`CPU`也不必取唤醒所有线程,会减少唤起线程的数量 **非公平锁**性能虽然优于**公平锁**,但是会存在导致**线程饥饿**的情况。在最坏的情况下,可能存在某个线程**一直获取不到锁**。不过相比性能而言,饥饿问题可以暂时忽略,这可能就是`ReentrantLock`默认创建非公平锁的原因之一了。 ### Condition实现原理 #### Condition简介 上面已经介绍了`AQS`所提供的核心功能,当然它还有很多其他的特性,这里我们来继续说下`Condition`这个组件。 `Condition`是在`java 1.5`中才出现的,它用来替代传统的`Object`的`wait()`、`notify()`实现线程间的协作,相比使用`Object`的`wait()`、`notify()`,使用`Condition`中的`await()`、`signal()`这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用`Condition` 其中`AbstractQueueSynchronizer`中实现了`Condition`中的方法,主要对外提供`awaite(Object.wait())`和`signal(Object.notify())`调用。 #### Condition Demo示例 使用示例代码: ```java /** * ReentrantLock 实现源码学习 * @author 一枝花算不算浪漫 * @date 2020/4/28 7:20 */ public class ReentrantLockDemo { static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) { Condition condition = lock.newCondition(); new Thread(() -> { lock.lock(); try { System.out.println("线程一加锁成功"); System.out.println("线程一执行await被挂起"); condition.await(); System.out.println("线程一被唤醒成功"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println("线程一释放锁成功"); } }).start(); new Thread(() -> { lock.lock(); try { System.out.println("线程二加锁成功"); condition.signal(); System.out.println("线程二唤醒线程一"); } finally { lock.unlock(); System.out.println("线程二释放锁成功"); } }).start(); } } ``` 执行结果如下图: ![image.png](https://i.loli.net/2020/04/30/4dcnhCUOj6SxtkL.png) 这里**线程一**先获取锁,然后使用`await()`方法挂起当前线程并**释放锁**,**线程二**获取锁后使用`signal`唤醒**线程一**。 #### Condition实现原理图解 我们还是用上面的`demo`作为实例,执行的流程如下: ![image.png](https://i.loli.net/2020/04/30/rcShFVXZGsHDoE4.png) **线程一**执行`await()`方法: 先看下具体的代码实现,`#java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject.await()`: ```java public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } ``` `await()`方法中首先调用`addConditionWaiter()`将当前线程加入到`Condition`队列中。 执行完后我们可以看下`Condition`队列中的数据: ![image.png](https://i.loli.net/2020/04/30/BIT4DWsjdqcGrpg.png) 具体实现代码为: ```java private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } ``` 这里会用当前线程创建一个`Node`节点,`waitStatus`为`CONDITION`。接着会释放该节点的锁,调用之前解析过的`release()`方法,释放锁后此时会唤醒被挂起的**线程二**,**线程二**会继续尝试获取锁。 接着调用`isOnSyncQueue()`方法判断当前节点是否为`Condition`队列中的头部节点,如果是则调用`LockSupport.park(this)`挂起`Condition`中当前线程。此时**线程一**被挂起,**线程二**获取锁成功。 具体流程如下图: ![image.png](https://i.loli.net/2020/04/30/pxlzK9kZGMYtcUF.png) **线程二**执行`signal()`方法: 首先我们考虑下**线程二**已经获取到锁,此时`AQS`等待队列中已经没有了数据。 接着就来看看**线程二**唤醒**线程一**的具体执行流程: ```java public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } ``` 先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。 接着调用`doSignal()`方法来唤醒线程。 ```java private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` 这里先从`transferForSignal()`方法来看,通过上面的分析我们知道`Condition`队列中只有线程一创建的一个`Node`节点,且`waitStatue`为`CONDITION`,先通过`CAS`修改当前节点`waitStatus`为0,然后执行`enq()`方法将当前线程加入到等待队列中,并返回当前线程的前置节点。 加入等待队列的代码在上面也已经分析过,此时等待队列中数据如下图: ![image.png](https://i.loli.net/2020/05/01/x2Aes5gnPVSYt3q.png) 接着开始通过`CAS`修改当前节点的前置节点`waitStatus`为`SIGNAL`,并且唤醒当前线程。此时`AQS`中等待队列数据为: ![image.png](https://i.loli.net/2020/05/01/AU1oSKbh5sePCiH.png) **线程一**被唤醒后,继续执行`await()`方法中的while循环。 ```java public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } ``` 因为此时线程一的`waitStatus`已经被修改为0,所以执行`isOnSyncQueue()`方法会返回`false`。跳出`while`循环。 接着执行`acquireQueued()`方法,这里之前也有讲过,尝试重新获取锁,如果获取锁失败继续会被挂起。直到另外线程释放锁才被唤醒。 ```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ``` 此时**线程一**的流程都已经分析完了,等**线程二**释放锁后,**线程一**会继续重试获取锁,流程到此终结。 #### Condition总结 我们总结下Condition和wait/notify的比较: - Condition可以精准的对多个不同条件进行控制,wait/notify只能和synchronized关键字一起使用,并且只能唤醒一个或者全部的等待队列; - Condition需要使用Lock进行控制,使用的时候要注意lock()后及时的unlock(),Condition有类似于await的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是park/unpark的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是wait/notify会产生先唤醒再挂起的死锁。 ### 总结 这里用了一步一图的方式结合三个线程依次加锁/释放锁来展示了`ReentrantLock`的实现方式和实现原理,而`ReentrantLock`底层就是基于`AQS`实现的,所以我们也对`AQS`有了深刻的理解。 另外还介绍了**公平锁**与**非公平锁**的实现原理,`Condition`的实现原理,基本上都是使用**源码+绘图**的讲解方式,尽量让大家更容易去理解。 参考资料: 1. 打通 Java 任督二脉 —— 并发数据结构的基石 https://juejin.im/post/5c11d6376fb9a049e82b6253 2. Java并发之AQS详解https://www.cnblogs.com/waterystone/p/4920797.html ![image-20210716163352584](https://tva1.sinaimg.cn/large/008i3skNly1gsivkbczxoj31l20t8al5.jpg) ![image-20210716163433337](https://tva1.sinaimg.cn/large/008i3skNly1gsivl4khz9j31d60h8mze.jpg)