0%

AQS-共享锁源码解析

本文主要内容为 AQS 共享锁源码的深入解析。其中有一些方法在 AQS 独占锁源码解析中提到过,所以不会再提,因此也节省了不少篇幅。

Introduction

顾名思义,共享锁和独占锁的区别是,独占锁同一时刻只能被一条线程持有,而共享锁同一时刻可以被多条线程共同持有。

共享锁和独占锁的实现方法可简单对应如下:

独占锁 共享锁
acquire(int arg) acquireShared(int arg)
tryAcquire(int arg) tryAcquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireInterruptibly(int arg) doAcquireSharedInterruptibly(int arg)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

Node

在共享模式中,nextWaiter 的值永远为一个特殊值 SHARED

另外在共享模式中,Node 的 waitStatus 会用到 0 / CANCELLED / SIGNAL / PROPAGATE 四种状态。

Source Code

接下来进入到本文的重点,共享锁的源码解析。

acquireShared

以共享模式获取,忽略中断。通过首先至少调用一次 tryAcquireShared 方法来实现,成功则返回,失败则线程排队,线程可能会反复阻塞和解除阻塞,直到调用 tryAcquireShared 方法成功。

acquireShared 方法执行逻辑:

  1. tryAcquireShared:(需要子类实现)尝试以共享模式获取。获取成功则返回,失败则执行步骤 2
  2. doAcquireShared:以共享不间断模式获取。仔细看该方法,几乎就是将独占锁 acquire 方法的后三步执行逻辑揉在了一起:
    1. 为获取失败的线程以共享模式(Node.SHARED)创建节点并入同步队列
    2. 入队后,队列中的线程以共享不间断模式获取。这期间可能需要将某些已排队的线程挂起,直到收到信号后再次执行
    3. 获取到锁的线程如果在等待时被中断,则获取到锁后中断它(线程在排队等待时忽略中断,但是会记录是否发生过中断)

acquireShared

tryAcquireShared

尝试以共享模式获取。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException

此方法由执行获取的线程调用,最终返回一个 int 值,分别代表 3 种情况:

  1. 负值:获取失败
  2. 0:获取成功但后续获取失败
  3. 正值:获取成功且后续获取也可能成功

因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getStatesetStatecompareAndSetState 等方法检查 和/或 修改同步状态 state

tryAcquireShared

doAcquireShared

前文提到,该方法几乎就是将独占锁 acquire 方法的后三步执行逻辑揉在了一起,唯二不一样的地方是:

  1. addWaiter 时以共享模式(Node.SHARED)创建节点并入同步队列
  2. 获取成功后,独占模式调用的是 setHead(Node node) 方法,而共享模式调用的是 setHeadAndPropagate(node, r) 方法。顾名思义,该方法不止会调用 setHead(Node node) 方法,还会在满足某些条件的情况下继续唤醒同步队列中排队的节点。原因正如前文所说,共享锁同一时刻可以被多条线程共同持有

doAcquireShared

setHeadAndPropagate

顾名思义,该方法做了两件事:

  1. 设置新的 head
  2. 在满足某些条件的情况下继续传播(唤醒同步队列中的后继节点)

setHeadAndPropagate 方法执行逻辑:

  1. 记录老的 head
  2. 将自己设置为新 head
  3. 判断是否应该继续传播,这个 if 条件不是很好理解,一点一点看,要满足条件,则:
    1. propagate > 0:共享锁还可以继续由其它线程获取
    2. propagate <= 0 && h == null:共享锁不能再被其它线程获取,且老的头节点为 null。这里其实就不是很好理解了,有个问题:1.老的头节点什么情况下会为 null?2.老的头节点为 null 为什么还要继续去传播?
    3. propagate <= 0 && h != null && h.waitStatus < 0:共享锁不能再被其它线程获取,且老的 head.waitStatus < 0,说明老的头节点表明需要继续传播
    4. propagate <= 0 && h != null && h.waitStatus >= 0 && (h = head) == null:共享锁不能再被其它线程获取,且老的头节点表明不需要继续传播,新的头节点为 null。这个条件应该是不会成立的,因为当 head 被初始化之后,肯定不可能为 null,或许可以理解这个判断是为防止下一个条件的 NPE 常规检查吧..(如果有不同见解,欢迎指出)
    5. propagate <= 0 && h != null && h.waitStatus >= 0 && (h = head) != null && h.waitStatus < 0:共享锁不能再被其它线程获取,且老的头节点表明不需要继续传播,但当前调用者线程表明需要继续传播
  4. 如果满足应该继续传播的条件,且下一个节点为 null 或在共享模式下等待,则调用 doReleaseShared 方法继续传播

这里我们分析下上面对于 propagate <= 0 && h == null 条件提到的两个问题:

一、老的 head 什么情况下会为 null?

  1. 假设当前同步队列为 head <=> node1(t1) <=> node2(t2)
  2. 此时有个线程调用 releaseShared 方法唤醒了 t1
  3. t1 在 doAcquireShared 方法中执行 tryAcquireShared 方法获取成功,执行到 setHeadAndPropagate 方法中记录老的 head 并调用 setHead 方法将自己设置为新 head 成功后 CPU 时间片耗尽被挂起(此时同步队列为 head(node1) <=> node2(t2))
  4. 此时又有个线程调用 releaseShared 方法唤醒了 t2(此时头节点 head(node1).waitStatus 为 0)
  5. t2 在 doAcquireShared 方法中执行 tryAcquireShared 方法获取成功,执行到 setHeadAndPropagate 方法中记录老的 head 后,下面的 if 条件都不成立,返回后在 doAcquireShared 方法中继续执行 p.next = null 导致 head(node1) 完全从同步队列中断开,断开后,老的 head 和 node1 就有可能会被回收
  6. t1 继续执行,那么此时老的 head 有可能为 null

虽然这种情况很极限,但是确实有可能出现,那必然就有判空的必要,否则后边就 NPE 了。

二、老的头节点为 null 为什么还要继续去传播?

确实继续传播下去可能会导致不必要的唤醒,但这没关系,因为一般来说,出现这种情况,必定是多个线程在竞争调用 acquireSharedreleaseShared 时才会如此,而在这种情况下,后续节点本身现在或很快就需要唤醒信号了。

反过来思考,万一在继续传播下去的过程中,在一个很极限的时间内满足了继续传播的条件,这样也可以加快传播,毕竟晚一点退出就多一点唤醒后面的机会。

setHeadAndPropagate

doReleaseShared

共享模式下的释放操作,目的是给后继节点发信号并确保传播。该方法其实是将传播行为做了抽象。

(关于该方法的引入及对 PROPAGATE 状态引入的思考见 AQS-Node.PROPAGATE 状态引入的意义

有两处会调用该方法:releaseShared(释放共享锁)和 setHeadAndPropagate。所以,该方法的每一行都有可能是多条线程并发执行的!

该方法是一个死循环,唯一可以退出循环的条件是:从进入循环那一刻到退出循环的整个期间,头节点都没有改变过。

doReleaseShared 方法执行逻辑就是两个 if 条件

第一个 if 条件执行逻辑:

  1. h != null && h != tail:如果此时同步队列中至少有两个节点则去执行步骤 2,否则去执行第二个 if 条件
  2. 如果 h.waitStatusSIGNAL,将 h.waitStatusSIGNAL 改为 0,然后调用 unparkSuccessor 方法唤醒后继节点。不要忘了该方法的每一行都有可能是多条线程并发执行的,这里用 CAS 保证了最终只会有一条线程去调用 unparkSuccessor 方法,其它线程又去循环了
  3. 如果 h.waitStatus 是 0,将 h.waitStatus 由 0 改为 PROPAGATE。这里 CAS 失败说明 h.waitStatus 由 0 变为了 SIGNAL(新节点入队)或 PROPAGATE(多线程竞争 CAS 导致失败)。同样不要忘了该方法的每一行都有可能是多条线程并发执行的,这里 CAS 失败的线程又去循环了

第二个 if 条件执行逻辑用来判断循环是否应该退出。

doReleaseShared

releaseShared

该方法用来在共享模式下释放锁,其返回值和 tryReleaseShared 方法的返回值一致。

releaseShared 方法执行逻辑:

  1. 调用 tryReleaseShared 方法尝试设置同步状态 state,如果 tryReleaseShared 返回 false,则 releaseShared 返回 false,否则执行步骤 2
  2. 调用 doReleaseShared 方法给后继节点发信号并确保传播,然后返回 truedoReleaseShared 方法上文刚分析过)

releaseShared

tryReleaseShared

tryReleaseShared 方法主要是尝试设置同步状态 state。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException

此方法始终由执行释放的线程调用。返回值 true 表示此共享模式的释放可能允许正在等待的线程获取成功(共享或独占);否则返回 false

因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getStatesetStatecompareAndSetState 等方法检查 和/或 修改同步状态 state

tryReleaseShared

tryAcquireSharedNanos

大体逻辑同 acquireShared 方法,如注释所说,和 acquireShared 方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquireShared 方法,在这其中做了超时和中断判断。获取成功则返回 true,超时则返回 false,中断则抛出 InterruptedException

tryAcquireSharedNanos

doAcquireSharedNanos

该方法相当于在 doAcquireShared 方法的基础上,做了超时和中断判断。注意:只有返回 true 时才不会调用 cancelAcquire 方法。

而且该方法在挂起线程之前做了个判断,当最长等待时间大于 spinForTimeoutThreshold 时,才会挂起。

spinForTimeoutThreshold 字段的作用前文说过,这里再来复述一遍:该值相当于一个阈值,在一些提供等待时间的操作中会使用该值来判断,当等待时间小于该值(即超时时间非常短)时直接自旋,这样可以提高程序的响应能力。

doAcquireSharedNanos

acquireSharedInterruptibly

大体逻辑同 acquireShared 方法,如注释所说,和 acquireShared 方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquireShared 方法,在这其中做了中断判断。中断则抛出 InterruptedException

acquireSharedInterruptibly

doAcquireSharedInterruptibly

该方法相当于在 doAcquireShared 方法的基础上,做了中断判断。

doAcquireSharedInterruptibly

至此,AQS 共享锁源码就解析完了,下篇文章会继续解析 Condition 部分的源码。


欢迎关注我的其它发布渠道