本文主要内容为 AQS 共享锁源码的深入解析。其中有一些方法在 AQS 独占锁源码解析中提到过,所以不会再提,因此也节省了不少篇幅。
Introduction
顾名思义,共享锁和独占锁的区别是,独占锁同一时刻只能被一条线程持有,而共享锁同一时刻可以被多条线程共同持有。
共享锁和独占锁的实现方法可简单对应如下:
独占锁 | 共享锁 |
---|---|
acquire(int arg) | acquireShared(int arg) |
tryAcquire(int arg) | tryAcquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
Node
在共享模式中,nextWaiter
的值永远为一个特殊值 SHARED
。
另外在共享模式中,Node 的 waitStatus
会用到 0 / CANCELLED / SIGNAL / PROPAGATE 四种状态。
Source Code
接下来进入到本文的重点,共享锁的源码解析。
acquireShared
以共享模式获取,忽略中断。通过首先至少调用一次 tryAcquireShared
方法来实现,成功则返回,失败则线程排队,线程可能会反复阻塞和解除阻塞,直到调用 tryAcquireShared
方法成功。
acquireShared
方法执行逻辑:
tryAcquireShared
:(需要子类实现)尝试以共享模式获取。获取成功则返回,失败则执行步骤 2doAcquireShared
:以共享不间断模式获取。仔细看该方法,几乎就是将独占锁acquire
方法的后三步执行逻辑揉在了一起:- 为获取失败的线程以共享模式(
Node.SHARED
)创建节点并入同步队列 - 入队后,队列中的线程以共享不间断模式获取。这期间可能需要将某些已排队的线程挂起,直到收到信号后再次执行
- 获取到锁的线程如果在等待时被中断,则获取到锁后中断它(线程在排队等待时忽略中断,但是会记录是否发生过中断)
- 为获取失败的线程以共享模式(
tryAcquireShared
尝试以共享模式获取。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException
。
此方法由执行获取的线程调用,最终返回一个 int 值,分别代表 3 种情况:
- 负值:获取失败
- 0:获取成功但后续获取失败
- 正值:获取成功且后续获取也可能成功
因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getState
、setState
、compareAndSetState
等方法检查 和/或 修改同步状态 state
。
doAcquireShared
前文提到,该方法几乎就是将独占锁 acquire
方法的后三步执行逻辑揉在了一起,唯二不一样的地方是:
addWaiter
时以共享模式(Node.SHARED
)创建节点并入同步队列- 获取成功后,独占模式调用的是
setHead(Node node)
方法,而共享模式调用的是setHeadAndPropagate(node, r)
方法。顾名思义,该方法不止会调用setHead(Node node)
方法,还会在满足某些条件的情况下继续唤醒同步队列中排队的节点。原因正如前文所说,共享锁同一时刻可以被多条线程共同持有
setHeadAndPropagate
顾名思义,该方法做了两件事:
- 设置新的 head
- 在满足某些条件的情况下继续传播(唤醒同步队列中的后继节点)
setHeadAndPropagate
方法执行逻辑:
- 记录老的 head
- 将自己设置为新 head
- 判断是否应该继续传播,这个 if 条件不是很好理解,一点一点看,要满足条件,则:
propagate > 0
:共享锁还可以继续由其它线程获取propagate <= 0 && h == null
:共享锁不能再被其它线程获取,且老的头节点为 null。这里其实就不是很好理解了,有个问题:1.老的头节点什么情况下会为 null?2.老的头节点为 null 为什么还要继续去传播?propagate <= 0 && h != null && h.waitStatus < 0
:共享锁不能再被其它线程获取,且老的 head.waitStatus < 0,说明老的头节点表明需要继续传播propagate <= 0 && h != null && h.waitStatus >= 0 && (h = head) == null
:共享锁不能再被其它线程获取,且老的头节点表明不需要继续传播,新的头节点为 null。这个条件应该是不会成立的,因为当 head 被初始化之后,肯定不可能为 null,或许可以理解这个判断是为防止下一个条件的 NPE 常规检查吧..(如果有不同见解,欢迎指出)propagate <= 0 && h != null && h.waitStatus >= 0 && (h = head) != null && h.waitStatus < 0
:共享锁不能再被其它线程获取,且老的头节点表明不需要继续传播,但当前调用者线程表明需要继续传播
- 如果满足应该继续传播的条件,且下一个节点为 null 或在共享模式下等待,则调用
doReleaseShared
方法继续传播
这里我们分析下上面对于 propagate <= 0 && h == null
条件提到的两个问题:
一、老的 head 什么情况下会为 null?
- 假设当前同步队列为 head <=> node1(t1) <=> node2(t2)
- 此时有个线程调用
releaseShared
方法唤醒了 t1 - t1 在
doAcquireShared
方法中执行tryAcquireShared
方法获取成功,执行到setHeadAndPropagate
方法中记录老的 head 并调用setHead
方法将自己设置为新 head 成功后 CPU 时间片耗尽被挂起(此时同步队列为 head(node1) <=> node2(t2)) - 此时又有个线程调用
releaseShared
方法唤醒了 t2(此时头节点head(node1).waitStatus
为 0) - t2 在
doAcquireShared
方法中执行tryAcquireShared
方法获取成功,执行到setHeadAndPropagate
方法中记录老的 head 后,下面的 if 条件都不成立,返回后在doAcquireShared
方法中继续执行p.next = null
导致 head(node1) 完全从同步队列中断开,断开后,老的 head 和 node1 就有可能会被回收 - t1 继续执行,那么此时老的 head 有可能为 null
虽然这种情况很极限,但是确实有可能出现,那必然就有判空的必要,否则后边就 NPE 了。
二、老的头节点为 null 为什么还要继续去传播?
确实继续传播下去可能会导致不必要的唤醒,但这没关系,因为一般来说,出现这种情况,必定是多个线程在竞争调用 acquireShared
或 releaseShared
时才会如此,而在这种情况下,后续节点本身现在或很快就需要唤醒信号了。
反过来思考,万一在继续传播下去的过程中,在一个很极限的时间内满足了继续传播的条件,这样也可以加快传播,毕竟晚一点退出就多一点唤醒后面的机会。
doReleaseShared
共享模式下的释放操作,目的是给后继节点发信号并确保传播。该方法其实是将传播行为做了抽象。
(关于该方法的引入及对 PROPAGATE
状态引入的思考见 AQS-Node.PROPAGATE 状态引入的意义)
有两处会调用该方法:releaseShared
(释放共享锁)和 setHeadAndPropagate
。所以,该方法的每一行都有可能是多条线程并发执行的!
该方法是一个死循环,唯一可以退出循环的条件是:从进入循环那一刻到退出循环的整个期间,头节点都没有改变过。
doReleaseShared
方法执行逻辑就是两个 if 条件
第一个 if 条件执行逻辑:
h != null && h != tail
:如果此时同步队列中至少有两个节点则去执行步骤 2,否则去执行第二个 if 条件- 如果
h.waitStatus
是SIGNAL
,将h.waitStatus
由SIGNAL
改为 0,然后调用unparkSuccessor
方法唤醒后继节点。不要忘了该方法的每一行都有可能是多条线程并发执行的,这里用 CAS 保证了最终只会有一条线程去调用unparkSuccessor
方法,其它线程又去循环了 - 如果
h.waitStatus
是 0,将h.waitStatus
由 0 改为PROPAGATE
。这里 CAS 失败说明h.waitStatus
由 0 变为了SIGNAL
(新节点入队)或PROPAGATE
(多线程竞争 CAS 导致失败)。同样不要忘了该方法的每一行都有可能是多条线程并发执行的,这里 CAS 失败的线程又去循环了
第二个 if 条件执行逻辑用来判断循环是否应该退出。
releaseShared
该方法用来在共享模式下释放锁,其返回值和 tryReleaseShared
方法的返回值一致。
releaseShared
方法执行逻辑:
- 调用
tryReleaseShared
方法尝试设置同步状态state
,如果tryReleaseShared
返回false
,则releaseShared
返回false
,否则执行步骤 2 - 调用
doReleaseShared
方法给后继节点发信号并确保传播,然后返回true
(doReleaseShared
方法上文刚分析过)
tryReleaseShared
tryReleaseShared
方法主要是尝试设置同步状态 state
。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException
。
此方法始终由执行释放的线程调用。返回值 true
表示此共享模式的释放可能允许正在等待的线程获取成功(共享或独占);否则返回 false
。
因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getState
、setState
、compareAndSetState
等方法检查 和/或 修改同步状态 state
。
tryAcquireSharedNanos
大体逻辑同 acquireShared
方法,如注释所说,和 acquireShared
方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquireShared
方法,在这其中做了超时和中断判断。获取成功则返回 true
,超时则返回 false
,中断则抛出 InterruptedException
。
doAcquireSharedNanos
该方法相当于在 doAcquireShared
方法的基础上,做了超时和中断判断。注意:只有返回 true
时才不会调用 cancelAcquire
方法。
而且该方法在挂起线程之前做了个判断,当最长等待时间大于 spinForTimeoutThreshold
时,才会挂起。
spinForTimeoutThreshold
字段的作用前文说过,这里再来复述一遍:该值相当于一个阈值,在一些提供等待时间的操作中会使用该值来判断,当等待时间小于该值(即超时时间非常短)时直接自旋,这样可以提高程序的响应能力。
acquireSharedInterruptibly
大体逻辑同 acquireShared
方法,如注释所说,和 acquireShared
方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquireShared
方法,在这其中做了中断判断。中断则抛出 InterruptedException
。
doAcquireSharedInterruptibly
该方法相当于在 doAcquireShared
方法的基础上,做了中断判断。
至此,AQS 共享锁源码就解析完了,下篇文章会继续解析 Condition 部分的源码。