要深入理解 AQS 的源码确实不容易,因为其中有很多情况要考虑,甚至源码中的每一个 if 语句可能就包含着一种或几种情况,尤其是在共享锁部分;另外还有一些因素也要考虑,比如多线程调用、CPU 分配的时间片、虚假唤醒等。
本文主要内容为 AQS 独占锁源码的深入解析,所以在本文中暂不考虑共享锁 和 Condition
的部分。
Introduction AQS 全称为 AbstractQueuedSynchronizer
,位于 java.util.concurrent.locks
包下,继承 AbstractOwnableSynchronizer
,是一个抽象类。
AQS 提供了一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等),是大多数依赖单个原子 int 值来表示状态的同步器的基础。这个单个原子 int 值在源码中是一个名为 state
的字段,由 volatile 修饰,子类必须重写更改此状态值的 protected 方法,并定义该状态在获取或释放此对象的含义。也因此,AQS 中的其他方法会执行所有排队和阻塞机制。
另外因为要考虑到多线程的因素,所以只有使用 getState
、setState
和 compareAndSetState
方法操作的状态值才会在同步方面进行跟踪。换言之,子类重写的 protected 方法的具体实现中需要使用这些方法来检查 和/或 修改状态值。
在使用上,子类应将其定义为非 public 的内部帮助类,用于实现其封闭类的同步属性,然后具体的使用都委托给该帮助类来实现。
AQS 支持独占模式和共享模式,默认为独占模式,子类可以根据需求来选择实现两种模式之一或两者。具体来说,只需根据适用情况重写以下方法即可(默认实现都会抛出 UnsupportedOperationException
):
tryAcquire
(独占模式获取)
tryRelease
(独占模式释放)
tryAcquireShared
(共享模式获取)
tryReleaseShared
(共享模式释放)
isHeldExclusively
(供 ConditionObject
方法内部调用)
以上方法的实现必须是内部线程安全的,并且应该是简短不阻塞的。
在不同模式下等待的线程共享同一个 FIFO 队列,称为同步队列(sync queue)。
AQS 中还定义了一个 ConditionObject
类,它可以被支持独占模式的子类用作 Condition
实现。AQS 不提供创建 Condition
的方法,ConditionObject
的行为取决于其同步器实现的语义。
AQS 类的序列化仅存储底层原子整数维护的状态,因此反序列化的对象将具有空的线程队列。需要序列化的子类需要定义 readObject
方法,该方法在反序列化时会将其恢复到已知的初始状态。
也可以提供公平的 FIFO 获取顺序,实现为:当 hasQueuedPredecessors
方法返回 true 时,定义 tryAcquire
方法返回 false
。
Demo Exclusive Mode 独占模式需要实现 tryAcquire
、tryRelease
以及 isHeldExclusively
(根据需要)方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 static class Mutex implements Lock , java .io .Serializable { private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean isHeldExclusively () { return getState() == 1 ; } @Override public boolean tryAcquire (int acquires) { assert acquires == 1 ; if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int releases) { assert releases == 1 ; if (getState() == 0 ) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } Condition newCondition () { return new ConditionObject(); } private void readObject (ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0 ); } } private final Sync sync = new Sync(); @Override public void lock () { sync.acquire(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } public boolean isLocked () { return sync.isHeldExclusively(); } public boolean hasQueuedThreads () { return sync.hasQueuedThreads(); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); } }
Shared Mode 共享模式需要实现 tryAcquireShared
和 tryReleaseShared
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class SharedModeDemo { static class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled () { return getState() != 0 ; } @Override protected int tryAcquireShared (int ignore) { return isSignalled() ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int ignore) { setState(1 ); return true ; } } private final Sync sync = new Sync(); public boolean isSignalled () { return sync.isSignalled(); } public void signal () { sync.releaseShared(1 ); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } } public static void main (String[] args) throws InterruptedException { BooleanLatch latch = new BooleanLatch(); new Thread(() -> { System.out.println("thread [" + Thread.currentThread().getName() + "] sleep 5s" ); try { Thread.sleep(5000L ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread [" + Thread.currentThread().getName() + "] sleep end, signal thread [main]" ); latch.signal(); }, "thread1" ).start(); System.out.println("thread [main] await" ); latch.await(); System.out.println("thread [main] end" ); } }
AbstractOwnableSynchronizer AbstractOwnableSynchronizer
是 AQS 继承的类,用来记录跟踪拥有独占同步器的线程,为创建需要所有权概念的锁和相关同步器提供基础。该类的子类或一些工具可以通过维护 exclusiveOwnerThread
字段值来帮助用户控制和监控访问并提供诊断。
该类中只有一个字段 exclusiveOwnerThread
,从字段名中可以看到,该字段可以记录持有独占锁的线程;此外,还提供了该字段的 set 和 get 方法。
Node Node 是等待队列(wait queue)的节点类,等待队列包括同步队列(sync queue)和条件队列(condition queue)。
等待队列是 “CLH” 锁队列的变体(之前详细介绍过 “CLH” 锁,see AQS 基础–多图详解 CLH 锁的原理与实现 ),“CLH” 锁通常用于自旋锁,在 AQS 中将它们用作阻塞同步器,但使用相同的基本策略:在其节点的前驱中保存有关线程的一些控制信息。
每个节点中的 waitStatus
字段可以表示线程是否应该阻塞。节点在其前驱 release 时会收到信号。队列中的每个节点都充当一个特定的通知监视器,持有一个等待线程。如果线程是队列中的第一个,则可以尝试去获取锁,但并不保证获取成功,这只是给予它竞争的权利而已,因此当前已释放的竞争者线程可能也需要继续重新等待。
在该类中,prev 属性在原始 “CLH” 锁中其实并没有用到,在这里主要是用于处理状态为 CANCELLED 的节点;而 next 属性则用来实现阻塞机制,换句话说,next 指针其实是一种优化,通常不需要向后扫描(具体表现在 unparkSuccessor
方法中,先看 next 是否符合条件,不符合再遍历,大多数情况下可能都会符合)。
“CLH” 队列需要一个虚拟头节点来开始。但是我们不在构建时就创建它,因为如果不存在竞争,这就是在做无用功。相反,在第一次竞争时构造节点并设置头尾指针(延迟初始化)。
在 Conditions
中等待的线程使用相同的节点,但使用额外的链接。Conditions
链接队列中的节点仅在独占持有时才会被访问,所以不需要考虑并发情况。在 await
时,一个节点被插入到 condition queue 中;signal
时,节点被转移到 sync queue。
Node 中的 nextWaiter
字段可以表明当前节点正在独占模式还是共享模式下等待。
Node 节点有以下等待状态(waitStatus):
等待状态
值
描述
具体描述
-
0
初始值
-
CANCELLED
1
表明线程已取消
由于超时或中断,该节点被取消,此后节点永远不会离开这个状态,而且取消节点的线程永远不会再次阻塞
SIGNAL
-1
表明后继线程需要唤醒
该节点的后继节点被(或即将被)阻塞(LockSupport.park
方法),因此当前节点在释放(release
方法)或取消(cancelAcquire
方法)时必须唤醒(unparkSuccessor
方法)其后继节点。为了避免竞争,acquire 方法中必须首先表明它们需要一个信号(shouldParkAfterFailedAcquire
方法),然后重试原子获取,然后在获取失败时阻塞
CONDITION
-2
表明线程在 condition queue 中等待
该节点当前在 condition queue 中,直到转移后,它才会用作 sync queue 的节点,此时状态将设置为 0(此处使用此值与该字段的其他用途无关,但可以简化机制)
PROPAGATE
-3
表明下一次 acquireShared
应无条件传播
releaseShared
方法应该传播到其他节点,该状态在 doReleaseShared
方法中设置(仅适用于头节点)以确保传播继续,即使其它操作已经介入
Node 类源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
在独占模式中,nextWaiter
的值为 null
或下一个在 condition queue 中等待的节点(如果有 condition queue 的话。因本文暂不分析其源码,所以在本文中可以认为该值就是 null
)。
另外在独占模式中,Node 的 waitStatus
只会用到 0 / CANCELLED / SIGNAL 这三种状态。
Sync Queue sync queue 是同步队列,代表所有等待获取锁的线程集合 ,是个有虚拟头节点(延迟初始化,下文中称其为 dummyHead)的双向链表。head
指向头节点(即 dummyHead),tail
指向尾节点,prev
指向上一个节点,next
指向下一个节点。
对于 head
中 thread 属性可以有另一种理解:sync queue 中的每个节点内部都会封装一个线程,唯独 head
节点中的为 null
,其实换个角度,也可以认为 head
节点中也封装了线程,只不过此时该线程获取到了锁,正在执行逻辑。
Fields 4 个重要的字段属性:head
/ tail
/ state
/ spinForTimeoutThreshold
5 个用于支持 CAS 算法的 Unsafe 类操作的字段:state
/ head
/ tail
/ waitStatus
/ next
(略)
spinForTimeoutThreshold
字段相当于一个阈值,在一些提供等待时间的方法中会使用该值来判断,当等待时间小于该值(即超时时间非常短)时直接自旋,这样可以提高程序的响应能力。
Source Code 了解了前文的前置知识,接下来终于要进入到本文的重点,独占锁的源码解析。
acquire 以独占模式获取,忽略中断。通过至少调用一次 tryAcquire
方法来实现,成功则返回,失败则线程排队。线程可能会反复阻塞和解除阻塞,直到调用 tryAcquire
方法获取成功。
acquire
方法执行逻辑:
tryAcquire
:(需要子类实现)尝试以独占模式获取,获取成功则返回,失败则执行步骤 2
addWaiter
:为获取失败的线程以独占模式(Node.EXCLUSIVE
)创建节点并入 sync queue,执行步骤 3
acquireQueued
:入队后,队列中的线程以独占不间断模式获取。这期间可能需要将某些已排队的线程挂起,直到收到信号后再次执行
selfInterrupt
:获取到锁的线程如果在等待时被中断,在这里中断它(线程在排队等待时忽略中断,但是会记录是否发生过中断)
Diagram of acquire 在详细解析 acquire
源码之前,先在上帝视角给大家提供一张该方法的执行流程图,方便对执行过程有个详细了解,也方便在下面的源码解析过程中,看到某个方法时,知道程序当前处于什么地方,不至于在复杂的源码中迷失。
tryAcquire 尝试以独占模式获取。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException
。此方法由执行 acquire
的线程调用,返回值代表获取成功(return true
)或失败(return false
)。
因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getState
、setState
、compareAndSetState
等方法检查 和/或 修改同步状态 state
。
addWaiter 为当前线程以给定模式(Node.EXCLUSIVE
或 Node.SHARED
)创建节点并入 sync queue,返回新入队的节点。
该方法的操作就是正常的节点入队操作,具体逻辑没什么好说的,有几个注意的点:
如果队列为空,会先创建一个虚拟头节点,再将当前线程节点插入队尾,即采用前文中提到的延迟初始化 head
新节点入 sync queue 操作:先设置 node.prev = tail
,再 CAS 设置 tail = node
,最后设置 tail.next = node
(注意这几步不是一个原子操作 )
此外,我们可以从该方法的实现中学到一个重要思想:fast path,即先简单尝试一下,成功就返回,失败再自旋循环。因为其实很多时候,直接操作就能解决大部分问题。
注意,在入队操作过程中,当某线程创建了新节点后,执行 compareAndSetTail(pred, node)
成功,但是 pred.next = node
还没有开始执行时(或刚好执行到此 CPU 给该线程分配的时间片耗尽),可能会出现下面这种现象:
在高并发的场景下,甚至可能会出现下面这种现象:
此时,如果从 head
往后遍历队列,会出现一个严重的问题:遍历不到新入队的节点!而从 tail
往前遍历是没有这个问题的。究其原因,就是因为新节点入队操作并不是一个原子操作。这也是 AQS 中遍历队列都采用从 tail
往前遍历的原因。
enq 普通的节点入队操作,主要用到 CAS + 自旋 ,如果队列为空,会先创建一个虚拟头节点。最终返回新入队节点的前驱节点。
acquireQueued 该方法情况比较复杂,需要详细分析(这里再贴一下前文的执行流程图)。
acquireQueued
方法执行逻辑:
如果当前线程是排队的第一个线程,执行 tryAcquire
方法尝试以独占模式获取
如果获取成功,重新设置队列头节点 (setHead
) 后退出
如果当前线程不是排队的第一个线程 或 当前线程是排队的第一个线程但获取失败,将前驱节点状态置为 SIGNAL (shouldParkAfterFailedAcquire
)
设置成功后,挂起线程,等待唤醒 (parkAndCheckInterrupt
)
线程被唤醒后回到步骤 1 重复上述步骤,直到获取成功
如果上述步骤执行过程中出现异常导致获取失败,取消正在进行的获取尝试(cancelAcquire
)
还有几点需要注意:
interrupted
字段用来记录线程在等待过程中是否发生过中断,该字段也是此方法的最终返回值
线程在排队等待时是忽略中断的,无论线程在等待过程中是否发生过中断,都需要获取到锁后才能返回。返回后再根据 interrupted
字段的值来决定是否中断
setHead 将 head
指向当前节点,内部属性 thread
和 prev
置为 null
。
其实该方法就是相当于,获取成功后,把该节点置为新的 dummyHead。但注意,此时旧的 dummyHead 还存在,且 oldDummyHead.next = newDummyHead,setHead
执行结束后才会清除旧的 dummyHead,将 oldDummyHead.next 指向 null
(这一步在 acquireQueued
中 p.next = null
)。
思考一下,这里设置头节点为什么不用 compareAndSetHead
,而是直接 head = node
?
因为这是在独占锁情况下,获取到锁的线程只会有一个,因此该方法不会存在并发调用的情况,可以放心大胆的使用 head = node
。
shouldParkAfterFailedAcquire 该方法逻辑不难理解,方法名也已说明了用意:should park after failed acquire(获取失败后应该挂起吗?)
而该方法唯一 return true
的条件就是 pred.waitStatus = Node.SIGNAL
,其它时候都会返回 false
。这也说明,获取失败后应该挂起线程的条件是:该节点的前置节点状态为 SIGNAL。否则,我们应该把前置节点的状态设置为 SIGNAL。(这里就同前文提到的 “CLH” 锁呼应上了,即在其节点的前驱中保存有关线程的一些控制信息)
特别注意: 当最终结果返回时,程序此时还处于 acquireQueued
方法的 for (;;)
中。因此,当最终结果返回 false
时,程序会再次回到上述 acquireQueued
方法执行逻辑的步骤 1 去执行。或者这里也可以理解为一个优化,相当于变相又给了线程一个去尝试获取的机会,也许这次就成功了。
parkAndCheckInterrupt node 的前置节点状态设置为 SIGNAL 了,挂起线程,直到收到释放信号或中断唤醒它。
特别注意: 假如此刻它被唤醒,它需要接着去执行。别忘了,程序此时还处于 acquireQueued
方法的 for (;;)
中,所以程序会再次回到上述 acquireQueued
方法执行逻辑的步骤 1 去执行,直到它获取成功。
思考这个问题:该方法最终返回值是线程是否中断,为什么不用 Thread.currentThread().isInterrupted()
方法?
在 Thread 源码解析 中说过,这两个方法的区别是:Thread.interrupted()
方法除了会返回线程是否中断外,还会重置线程的中断状态。那么就有个新问题:返回值是线程是否中断,我只需要知道是否中断就行了,为什么还要清除中断状态?
这是因为,中断后,如果不清除中断状态,下次 park 是不生效的。假如该线程下次仍然没有获取到锁,就 park 不住了(关于这个结论,做个实验就知道了。如果不想做实验,我也帮你做了,see The relationship between Interrupt and Park )。因此这里需要一个又能重置线程中断状态又能返回线程是否中断的方法,所以必须使用 Thread.interrupted()
方法。
cancelAcquire 如果在获取过程中出现异常导致获取失败,取消正在进行的获取尝试。
如何取消呢?是不是就是将此节点从队列中彻底删除?
其实不是的,该方法对应的情况比较多,基本上最终就是修改了节点的 waitStatus
以及 next
指针,将节点彻底从队列中删除是依赖后续的遍历 / setHead
/ p.next = null
操作。具体可见上篇文章 AQS-cancelAcquire 方法源码解析 。
unparkSuccessor 唤醒 node 的后继节点。在前文 cancelAcquire
方法中,如果 node 不是 tail && pred 是 head 则会调用此方法;另外在释放锁时也会调用此方法。
unparkSuccessor
方法执行逻辑:
如果 node.waitStatus < 0
,即 node 需要给它的后继节点发信号或者说 node 的后继节点需要被唤醒,将 node 的 waitStatus
置为 0(清除预期的信号)
寻找要 unpark 的后继节点,如果找到,则唤醒它。具体逻辑为:先看 node 的 next
是否符合条件,如果符合就唤醒;如果不符合就从 tail
往前遍历寻找,如果找到就唤醒(这里也同前文提到的 Node 类的 next 指针的作用呼应上了)
node.waitStatus < 0
的判断主要是为了清除 node 的 waitStatus
,也就是说,此时,无论 node 后的节点是什么状态,无论 node 的后继节点是否被唤醒,只要调用完该方法,node 的状态肯定为 0 或 CANCELLED(node.waitStatus >= 0,即清除了预期的信号)。
selfInterrupt 当前线程在等待过程中被中断,中断当前线程。
这里可能有人会有疑问,当前线程已经在等待过程中被中断,为什么又要中断一次?
因为 parkAndCheckInterrupt
方法里 check interrupt 时用的是 Thread.interrupted()
方法,前文提到,该方法除了会返回当前线程是否被中断外,还会重置中断状态。因此,必须在这里补上这次中断。
release 该方法用来在独占模式下释放锁。该方法返回值和 tryRelease
方法的返回值一致。
release
方法执行逻辑:
调用 tryRelease
方法尝试设置同步状态 state
,如果 tryRelease
返回 false
,则 release
返回 false
,否则执行步骤 2
这两种情况下不做操作,直接返回 true
:当 head
节点为空,说明队列为空;当 head
节点不为空且 waitStatus
为 0,说明队列中此时只有一个 head
节点或有其它线程正在执行 unparkSuccessor
方法。否则去调用 unparkSuccessor
方法去唤醒 head
的后继节点,然后返回 true
当 head
节点的 waitStatus
为 0,此时可能有其它线程正在执行 unparkSuccessor
方法,是不是觉得不可思议,独占锁模式下竟然还会有两个线程同时调用 unparkSuccessor
方法?
这是有可能的。前文中说过,独占锁模式下,unparkSuccessor
方法可能会有两处调用,一处是在 cancelAcquire
方法中,一处是在 release
方法中。假设这种情况:
A 线程获取到锁后执行逻辑(还未调用 release
方法)
B、C 线程获取不到锁,相继入队
B 线程执行 acquireQueued
方法结果抛异常走到了 cancelAcquire
方法中,执行 compareAndSetWaitStatus(node, ws, 0)
这句成功,然后在满足一系列条件后调用 unparkSuccessor
方法去唤醒 C 线程
A 线程执行完逻辑后调用 release
方法,此时发现 head
的 waitStatus
等于 0,其实意味着 A 要唤醒后继节点的工作被 B 给做了,那 A 什么都不用做直接 return true
即可
tryRelease tryRelease
方法主要是尝试设置同步状态 state
。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException
。此方法始终由执行 release
的线程调用。返回值 true
表示如果此对象现在处于完全 release 状态,任何等待线程都可以尝试获取;否则返回 false
。
因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getState
、setState
、compareAndSetState
等方法检查 和/或 修改同步状态 state
。
unparkSuccessor
的逻辑前文已介绍,此处不再赘述。另外,别忘了唤醒后还是会继续去获取锁的。
tryAcquireNanos 大体逻辑同 acquire
方法,如注释所说,和 acquire
方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquire
方法,在这其中做了超时和中断判断。成功则返回 true
,超时则返回 false
,中断则抛出 InterruptedException
。
doAcquireNanos 该方法相当于把前文中说的 addWaiter(Node.EXCLUSIVE)
方法揉进了 acquireQueued()
方法里,并做了超时和中断判断。注意:只有返回 true
时才不会调用 cancelAcquire
方法。
而且该方法在挂起线程之前做了个判断,当最长等待时间大于 spinForTimeoutThreshold
时,才会挂起。
spinForTimeoutThreshold
字段的作用前文说过,这里再来复述一遍:该值相当于一个阈值,在一些提供等待时间的操作中会使用该值来判断,当等待时间小于该值(即超时时间非常短)时直接自旋,这样可以提高程序的响应能力。
acquireInterruptibly 大体逻辑同 acquire
方法,如注释所说,和 acquire
方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquire
方法,在这其中做了中断判断。中断则抛出 InterruptedException
。
doAcquireInterruptibly 该方法相当于把前文中说的 addWaiter(Node.EXCLUSIVE)
方法揉进了 acquireQueued
方法里,并做了中断判断。
至此,AQS 独占锁 源码就解析完了,下篇文章会继续解析共享锁 源码。个人认为共享锁对应的情况更复杂一些,所以也更难理解一些。