0%

本文主要内容为 AQS 共享锁源码的深入解析。其中有一些方法在 AQS 独占锁源码解析中提到过,所以不会再提,因此也节省了不少篇幅。

Introduction

顾名思义,共享锁和独占锁的区别是,独占锁同一时刻只能被一条线程持有,而共享锁同一时刻可以被多条线程共同持有。

共享锁和独占锁的实现方法可简单对应如下:

独占锁 共享锁
acquire(int arg) acquireShared(int arg)
tryAcquire(int arg) tryAcquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireInterruptibly(int arg) doAcquireSharedInterruptibly(int arg)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

Node

在共享模式中,nextWaiter 的值永远为一个特殊值 SHARED

另外在共享模式中,Node 的 waitStatus 会用到 0 / CANCELLED / SIGNAL / PROPAGATE 四种状态。

Source Code

接下来进入到本文的重点,共享锁的源码解析。

acquireShared

以共享模式获取,忽略中断。通过首先至少调用一次 tryAcquireShared 方法来实现,成功则返回,失败则线程排队,线程可能会反复阻塞和解除阻塞,直到调用 tryAcquireShared 方法成功。

acquireShared 方法执行逻辑:

  1. tryAcquireShared:(需要子类实现)尝试以共享模式获取。获取成功则返回,失败则执行步骤 2
  2. doAcquireShared:以共享不间断模式获取。仔细看该方法,几乎就是将独占锁 acquire 方法的后三步执行逻辑揉在了一起:
    1. 为获取失败的线程以共享模式(Node.SHARED)创建节点并入同步队列
    2. 入队后,队列中的线程以共享不间断模式获取。这期间可能需要将某些已排队的线程挂起,直到收到信号后再次执行
    3. 获取到锁的线程如果在等待时被中断,则获取到锁后中断它(线程在排队等待时忽略中断,但是会记录是否发生过中断)

acquireShared

tryAcquireShared

尝试以共享模式获取。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException

此方法由执行获取的线程调用,最终返回一个 int 值,分别代表 3 种情况:

  1. 负值:获取失败
  2. 0:获取成功但后续获取失败
  3. 正值:获取成功且后续获取也可能成功

因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getStatesetStatecompareAndSetState 等方法检查 和/或 修改同步状态 state

tryAcquireShared

doAcquireShared

前文提到,该方法几乎就是将独占锁 acquire 方法的后三步执行逻辑揉在了一起,唯二不一样的地方是:

  1. addWaiter 时以共享模式(Node.SHARED)创建节点并入同步队列
  2. 获取成功后,独占模式调用的是 setHead(Node node) 方法,而共享模式调用的是 setHeadAndPropagate(node, r) 方法。顾名思义,该方法不止会调用 setHead(Node node) 方法,还会在满足某些条件的情况下继续唤醒同步队列中排队的节点。原因正如前文所说,共享锁同一时刻可以被多条线程共同持有

doAcquireShared

setHeadAndPropagate

顾名思义,该方法做了两件事:

  1. 设置新的 head
  2. 在满足某些条件的情况下继续传播(唤醒同步队列中的后继节点)

setHeadAndPropagate 方法执行逻辑:

  1. 记录老的 head
  2. 将自己设置为新 head
  3. 判断是否应该继续传播,这个 if 条件不是很好理解,一点一点看,要满足条件,则:
    1. propagate > 0:共享锁还可以继续由其它线程获取
    2. propagate <= 0 && h == null:共享锁不能再被其它线程获取,且老的头节点为 null。这里其实就不是很好理解了,有个问题:1.老的头节点什么情况下会为 null?2.老的头节点为 null 为什么还要继续去传播?
    3. propagate <= 0 && h != null && h.waitStatus < 0:共享锁不能再被其它线程获取,且老的 head.waitStatus < 0,说明老的头节点表明需要继续传播
    4. propagate <= 0 && h != null && h.waitStatus >= 0 && (h = head) == null:共享锁不能再被其它线程获取,且老的头节点表明不需要继续传播,新的头节点为 null。这个条件应该是不会成立的,因为当 head 被初始化之后,肯定不可能为 null,或许可以理解这个判断是为防止下一个条件的 NPE 常规检查吧..(如果有不同见解,欢迎指出)
    5. propagate <= 0 && h != null && h.waitStatus >= 0 && (h = head) != null && h.waitStatus < 0:共享锁不能再被其它线程获取,且老的头节点表明不需要继续传播,但当前调用者线程表明需要继续传播
  4. 如果满足应该继续传播的条件,且下一个节点为 null 或在共享模式下等待,则调用 doReleaseShared 方法继续传播

这里我们分析下上面对于 propagate <= 0 && h == null 条件提到的两个问题:

一、老的 head 什么情况下会为 null?

  1. 假设当前同步队列为 head <=> node1(t1) <=> node2(t2)
  2. 此时有个线程调用 releaseShared 方法唤醒了 t1
  3. t1 在 doAcquireShared 方法中执行 tryAcquireShared 方法获取成功,执行到 setHeadAndPropagate 方法中记录老的 head 并调用 setHead 方法将自己设置为新 head 成功后 CPU 时间片耗尽被挂起(此时同步队列为 head(node1) <=> node2(t2))
  4. 此时又有个线程调用 releaseShared 方法唤醒了 t2(此时头节点 head(node1).waitStatus 为 0)
  5. t2 在 doAcquireShared 方法中执行 tryAcquireShared 方法获取成功,执行到 setHeadAndPropagate 方法中记录老的 head 后,下面的 if 条件都不成立,返回后在 doAcquireShared 方法中继续执行 p.next = null 导致 head(node1) 完全从同步队列中断开,断开后,老的 head 和 node1 就有可能会被回收
  6. t1 继续执行,那么此时老的 head 有可能为 null

虽然这种情况很极限,但是确实有可能出现,那必然就有判空的必要,否则后边就 NPE 了。

二、老的头节点为 null 为什么还要继续去传播?

确实继续传播下去可能会导致不必要的唤醒,但这没关系,因为一般来说,出现这种情况,必定是多个线程在竞争调用 acquireSharedreleaseShared 时才会如此,而在这种情况下,后续节点本身现在或很快就需要唤醒信号了。

反过来思考,万一在继续传播下去的过程中,在一个很极限的时间内满足了继续传播的条件,这样也可以加快传播,毕竟晚一点退出就多一点唤醒后面的机会。

setHeadAndPropagate

doReleaseShared

共享模式下的释放操作,目的是给后继节点发信号并确保传播。该方法其实是将传播行为做了抽象。

(关于该方法的引入及对 PROPAGATE 状态引入的思考见 AQS-Node.PROPAGATE 状态引入的意义

有两处会调用该方法:releaseShared(释放共享锁)和 setHeadAndPropagate。所以,该方法的每一行都有可能是多条线程并发执行的!

该方法是一个死循环,唯一可以退出循环的条件是:从进入循环那一刻到退出循环的整个期间,头节点都没有改变过。

doReleaseShared 方法执行逻辑就是两个 if 条件

第一个 if 条件执行逻辑:

  1. h != null && h != tail:如果此时同步队列中至少有两个节点则去执行步骤 2,否则去执行第二个 if 条件
  2. 如果 h.waitStatusSIGNAL,将 h.waitStatusSIGNAL 改为 0,然后调用 unparkSuccessor 方法唤醒后继节点。不要忘了该方法的每一行都有可能是多条线程并发执行的,这里用 CAS 保证了最终只会有一条线程去调用 unparkSuccessor 方法,其它线程又去循环了
  3. 如果 h.waitStatus 是 0,将 h.waitStatus 由 0 改为 PROPAGATE。这里 CAS 失败说明 h.waitStatus 由 0 变为了 SIGNAL(新节点入队)或 PROPAGATE(多线程竞争 CAS 导致失败)。同样不要忘了该方法的每一行都有可能是多条线程并发执行的,这里 CAS 失败的线程又去循环了

第二个 if 条件执行逻辑用来判断循环是否应该退出。

doReleaseShared

releaseShared

该方法用来在共享模式下释放锁,其返回值和 tryReleaseShared 方法的返回值一致。

releaseShared 方法执行逻辑:

  1. 调用 tryReleaseShared 方法尝试设置同步状态 state,如果 tryReleaseShared 返回 false,则 releaseShared 返回 false,否则执行步骤 2
  2. 调用 doReleaseShared 方法给后继节点发信号并确保传播,然后返回 truedoReleaseShared 方法上文刚分析过)

releaseShared

tryReleaseShared

tryReleaseShared 方法主要是尝试设置同步状态 state。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException

此方法始终由执行释放的线程调用。返回值 true 表示此共享模式的释放可能允许正在等待的线程获取成功(共享或独占);否则返回 false

因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getStatesetStatecompareAndSetState 等方法检查 和/或 修改同步状态 state

tryReleaseShared

tryAcquireSharedNanos

大体逻辑同 acquireShared 方法,如注释所说,和 acquireShared 方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquireShared 方法,在这其中做了超时和中断判断。获取成功则返回 true,超时则返回 false,中断则抛出 InterruptedException

tryAcquireSharedNanos

doAcquireSharedNanos

该方法相当于在 doAcquireShared 方法的基础上,做了超时和中断判断。注意:只有返回 true 时才不会调用 cancelAcquire 方法。

而且该方法在挂起线程之前做了个判断,当最长等待时间大于 spinForTimeoutThreshold 时,才会挂起。

spinForTimeoutThreshold 字段的作用前文说过,这里再来复述一遍:该值相当于一个阈值,在一些提供等待时间的操作中会使用该值来判断,当等待时间小于该值(即超时时间非常短)时直接自旋,这样可以提高程序的响应能力。

doAcquireSharedNanos

acquireSharedInterruptibly

大体逻辑同 acquireShared 方法,如注释所说,和 acquireShared 方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquireShared 方法,在这其中做了中断判断。中断则抛出 InterruptedException

acquireSharedInterruptibly

doAcquireSharedInterruptibly

该方法相当于在 doAcquireShared 方法的基础上,做了中断判断。

doAcquireSharedInterruptibly

至此,AQS 共享锁源码就解析完了,下篇文章会继续解析 Condition 部分的源码。


前言

关于 AQS 中的 Node.PROPAGATE 状态,源码中是这么说的:

PROPAGATE 状态表明下一次 acquireShared 应无条件传播。
releaseShared 方法应该传播到其他节点,该状态在 doReleaseShared 方法中设置(仅适用于头节点)以确保传播继续,即使其它操作已经介入。

但光看这些晦涩的文字,还是很难很好的理解它存在的意义,为什么要引入它呢?

通过查找资料发现,其实,PROPAGATE 状态的引入是为了解决 AQS 的一个 bug。

bug: https://bugs.openjdk.java.net/browse/JDK-6801020
fix: https://github.com/openjdk/jdk8u/commit/b63d6d68d93ebc34f8b4091a752eba86ff575fc2

这个 bug 是一个关于 Semaphore 的 case。

在 AQS 引入 PROPAGATE 状态前,并发调用 Semaphorerelease 方法,某些情况下同步队列中排队的线程仍不会被唤醒。

这个 case 的完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.Semaphore;

public class TestSemaphore {

/**
* Semaphore 初始状态为 0
*/
private static final Semaphore SEM = new Semaphore(0);

private static class Thread1 extends Thread {
@Override
public void run() {
// 获取 1 个许可,会阻塞等待其他线程释放许可,可被中断
SEM.acquireUninterruptibly();
}
}

private static class Thread2 extends Thread {
@Override
public void run() {
// 释放 1 个许可
SEM.release();
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}

引入 PROPAGATE 状态前

我们来分析一下引入 PROPAGATE 状态前这段代码会有什么问题。

首先,我们看看当时版本的 AQS 源码是怎样的,这里我们只看和 PROPAGATE 状态有关的 setHeadAndPropagatereleaseShared 方法即可。当时还没有引入 doReleaseShared 方法,该方法是后来解决这个 bug 时引入的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}

public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

接下来,我们分析下这个 bug 是如何产生的。

TestSemaphore 中,Semaphore 初始许可为 0,同时运行 4 个子线程,2 个子线程(t1,t2)同时获取 1 个许可,另外 2 个子线程(t3,t4)同时释放 1 个许可,每次循环主线程都会等待所有子线程运行完毕。

我们假设 t1 和 t2 先获取许可,因为初始许可为 0,所以 t1 和 t2 入同步队列,假设此刻的同步队列是这样的:

head <=> node1(t1) <=> node2(t2 tail)

此时 head.waitStatusSIGNAL。接下来,t3 先释放,t4 后释放:

  1. t3 调用 tryReleaseShared 方法释放 1 个许可,然后调用 unparkSuccessor 方法将 head.waitStatusSIGNAL 改为 0,并唤醒后继节点 t1 后退出
  2. t1 被 t3 唤醒,调用 tryAcquireShared 方法获取到许可并返回 0(此时还未调用 setHeadAndPropagate 方法中的 setHead 方法将自己设置为新 head)
  3. t4 调用 tryReleaseShared 方法释放 1 个许可,因为 head 未改变,因此 head.waitStatus 仍为 0,这导致 t4 退出,不会继续调用 unparkSuccessor 方法唤醒后继节点 t2
  4. t1 继续调用 setHeadAndPropagate 方法,首先将自己设置为新 head,然后因为 tryAcquireShared 方法返回 0 导致 t1 退出,不会继续调用 unparkSuccessor 方法唤醒后继节点 t2

至此,t2 永远不会被唤醒,问题产生。

引入 PROPAGATE 状态后

接下来我们再来看看引入 PROPAGATE 状态后这个问题如何解决。

同样先看下引入 PROPAGATE 状态后的 AQS 源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}

同样的例子:

  1. t3 调用 tryReleaseShared 方法释放 1 个许可,然后调用 doReleaseShared 方法将 head.waitStatusSIGNAL 改为 0,并唤醒后继节点 t1 后退出
  2. t1 被 t3 唤醒,调用 tryAcquireShared 方法获取到许可并返回 0(此时还未调用 setHeadAndPropagate 方法中的 setHead 方法将自己设置为新 head)
  3. t4 调用 tryReleaseShared 方法释放 1 个许可,因为 head 未改变,因此 head.waitStatus 仍为 0,然后调用 doReleaseShared 方法将 head.waitStatus 由 0 改为 PROPAGATE 后 t4 退出
  4. t1 继续调用 setHeadAndPropagate 方法,首先将自己设置为新 head,因为此时旧 head.waitStatusPROPAGATE 且同步队列中 t1 还有后继节点 t2,所以继续调用 doReleaseShared 方法,将 head.waitStatusSIGNAL 改为 0,并唤醒后继节点 t2 后退出

后继节点 t2 被唤醒,问题解决。

个人见解与思考

其实,setHeadAndPropagate 方法逻辑改成如下也可以解决这个 bug,甚至都不需要引入 PROPAGATE 状态。

(基本思路是:head.waitStatus 为 0 是多线程下可能出现的中间状态,既然 head.waitStatus 在多线程下遇 0 要变 PROPAGATE,那在 setHeadAndPropagate 方法中判断头节点时加上 0 就行了)

1
2
3
4
5
6
7
8
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 || node.waitStatus <= 0) { // 修改这里
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}

所以,这就产生了一个新问题:引入 PROPAGATE 状态可以解决这个 bug,但是解决这个 bug 并不一定非要引入 PROPAGATE 状态,那为什么最终还是引入了呢?

查了一些资料后也无果,下面有一些自己的思考,欢迎交流指正。

解决 bug

引入 PROPAGATE 状态的第一个好处:解决这个 bug。

这个 bug 产生的原因,就是因为共享锁的获取和释放在同一时刻很可能会有多条线程并发执行,这就导致在这个过程中可能会产生这种 waitStatus 为 0 的中间状态,可以通过引入 PROPAGATE 状态来解决这个问题。

语意更清晰

引入 PROPAGATE 状态的第二个好处:语意更清晰。

我们可以再深入思考下,既然 head.waitStatus 由 0 变 PROPAGATE,那 head.waitStatus 什么时候是 0?

因为 doReleaseShared 方法只有 releaseSharedsetHeadAndPropagate 方法调用,所以从排列组合来说,无非是以下四种情况:
假设有两个线程(或多个线程)

  1. 两个同时调用 releaseShared,一个先将 head.waitStatusSIGNAL 改为 0
  2. 两个同时调用 setHeadAndPropagate,一个先将 head.waitStatusSIGNAL 改为 0
  3. 一个先调 releaseSharedhead.waitStatusSIGNAL 改为 0,另一个再调 setHeadAndPropagate
  4. 一个先调 setHeadAndPropagatehead.waitStatusSIGNAL 改为 0,另一个再调 releaseShared

下面具体看下每种情况:(node1(t1 0) 表示 node1.thread 为 t1,node1.waitStatus 为 0)

情况 1:

head(-1) <=> node1(t1 0)

  1. 线程 A 调用 releaseShared 方法将 head.waitStatusSIGNAL 改为 0 并唤醒 t1 后退出。此时 head(0) <=> node1(t1 0)
  2. 线程 B 调用 releaseShared 方法时发现 head.waitStatus 为 0

情况 2:

head(-1) <=> node1(t1 -1) <=> node2(t2 -1) <=> node3(t3 0)

  1. 线程 A 调用 releaseShared 方法将 head.waitStatusSIGNAL 改为 0 并唤醒 t1 后退出。此时 head(0) <=> node1(t1 -1) <=> node2(t2 -1) <=> node3(t3 0)
  2. t1 获取到锁成为头节点,此时 head.waitStatusSIGNAL,调用 doReleaseShared 方法。此时 head(node1 -1) <=> node2(t2 -1) <=> node3(t3 0)
  3. 线程 B 调用 releaseShared 方法将 head.waitStatusSIGNAL 改为 0 并唤醒 t2 后退出。此时 head(node1 0) <=> node2(t2 -1) <=> node3(t3 0)
  4. t2 获取到锁成为头节点,此时 head.waitStatusSIGNAL,调用 doReleaseShared 方法。此时 head(node2 -1) <=> node3(t3 0)
  5. t1 将 head.waitStatusSIGNAL 改为 0 并去唤醒 t3。此时 head(node2 0) -> node3(t3 0)
  6. t2 发现 head.waitStatus 为 0

情况 3:

head(-1) <=> node1(t1 -1) <=> node2(t2 0)

  1. 线程 A 调用 releaseShared 方法将 head.waitStatusSIGNAL 改为 0 并唤醒 t1 后退出。此时 head(0) <=> node1(t1 -1) <=> node2(t2 0)
  2. t1 获取到锁成为头节点,此时 head.waitStatusSIGNAL,调用 doReleaseShared 方法。此时 head(node1 -1) <=> node2(t2 0)
  3. 线程 B 调用 releaseShared 方法将 head.waitStatusSIGNAL 改为 0 并去唤醒 t2 后退出。此时 head(node1 0) <=> node2(t2 0)
  4. t1 发现 head.waitStatus 为 0

情况 4:

head(-1) <=> node1(t1 -1) <=> node2(t2 0)

  1. 线程 A 调用 releaseShared 方法将 head.waitStatusSIGNAL 改为 0 并唤醒 t1 后退出。此时 head(0) <=> node1(t1 -1) <=> node2(t2 0)
  2. t1 获取到锁成为头节点,此时 head.waitStatusSIGNAL,调用 doReleaseShared 方法。此时 head(node1 -1) <=> node2(t2 0)
  3. t1 将 head.waitStatusSIGNAL 改为 0 并唤醒 t2 后退出。此时 head(node1 0) <=> node2(t2 0)
  4. 线程 B 调用 releaseShared 方法,发现 head.waitStatus 为 0

我们知道,head.waitStatus 为 0 代表 head 是刚成为头节点的,即 head 刚初始化,或 tail 获取到锁后成为新 head,导致队列中只剩下 head(在这个前提下,后续节点可能正在加入,也可能刚加入还没来得及将 head.waitStatus 改为 SIGNAL,但这不重要)。

从上述情况中可以发现:head.waitStatus 为 0 还可以短暂代表共享模式下有线程正在调用 unparkSuccessor 方法去唤醒后继节点(其实就是这种情况被标识为了 PROPAGATE)。

所以,引入 PROPAGATE 状态后,head.waitStatus 为 0 和 PROPAGATE 就分别代表不同的情况,否则就要揉在一起,不好理解。

加速传播

引入 PROPAGATE 状态的第三个好处:加速唤醒后继节点

doReleaseShared 方法中有这个条件判断:

1
2
if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;

如果没有 PROPAGATE 状态,当多条线程同时运行到这里后,可能就直接退出了,虽然这时有个线程正在调用 unparkSuccessor 方法去唤醒后继节点,但唤醒后的线程也需要等到获取到锁且成为头节点后才能调用 doReleaseShared 方法再去唤醒后继节点。

当并发大时,在这个过程中很有可能会有新节点入队并满足唤醒条件,所以有了 PROPAGATE 状态,当多条线程同时运行到这里后,CAS 失败后的线程可以再次去循环判断能否唤醒后继节点,如果满足唤醒条件就去唤醒。

毕竟,调用 doReleaseShared 方法越多、越早就越有可能更快的唤醒后继节点。

总结

因此,bug 解决的更优雅且可以带来不错的收益也许才是最终引入 PROPAGATE 状态的原因吧,欢迎交流指正。


要深入理解 AQS 的源码确实不容易,因为其中有很多情况要考虑,甚至源码中的每一个 if 语句可能就包含着一种或几种情况,尤其是在共享锁部分;另外还有一些因素也要考虑,比如多线程调用、CPU 分配的时间片、虚假唤醒等。

本文主要内容为 AQS 独占锁源码的深入解析,所以在本文中暂不考虑共享锁Condition 的部分。

Introduction

AQS 全称为 AbstractQueuedSynchronizer,位于 java.util.concurrent.locks 包下,继承 AbstractOwnableSynchronizer,是一个抽象类。

AQS 提供了一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等),是大多数依赖单个原子 int 值来表示状态的同步器的基础。这个单个原子 int 值在源码中是一个名为 state 的字段,由 volatile 修饰,子类必须重写更改此状态值的 protected 方法,并定义该状态在获取或释放此对象的含义。也因此,AQS 中的其他方法会执行所有排队和阻塞机制。

另外因为要考虑到多线程的因素,所以只有使用 getStatesetStatecompareAndSetState 方法操作的状态值才会在同步方面进行跟踪。换言之,子类重写的 protected 方法的具体实现中需要使用这些方法来检查 和/或 修改状态值。

在使用上,子类应将其定义为非 public 的内部帮助类,用于实现其封闭类的同步属性,然后具体的使用都委托给该帮助类来实现。

AQS 支持独占模式和共享模式,默认为独占模式,子类可以根据需求来选择实现两种模式之一或两者。具体来说,只需根据适用情况重写以下方法即可(默认实现都会抛出 UnsupportedOperationException):

  • tryAcquire(独占模式获取)
  • tryRelease(独占模式释放)
  • tryAcquireShared(共享模式获取)
  • tryReleaseShared(共享模式释放)
  • isHeldExclusively(供 ConditionObject 方法内部调用)

以上方法的实现必须是内部线程安全的,并且应该是简短不阻塞的。

在不同模式下等待的线程共享同一个 FIFO 队列,称为同步队列(sync queue)。

AQS 中还定义了一个 ConditionObject 类,它可以被支持独占模式的子类用作 Condition 实现。AQS 不提供创建 Condition 的方法,ConditionObject 的行为取决于其同步器实现的语义。

AQS 类的序列化仅存储底层原子整数维护的状态,因此反序列化的对象将具有空的线程队列。需要序列化的子类需要定义 readObject 方法,该方法在反序列化时会将其恢复到已知的初始状态。

也可以提供公平的 FIFO 获取顺序,实现为:当 hasQueuedPredecessors 方法返回 true 时,定义 tryAcquire 方法返回 false

Demo

Exclusive Mode

独占模式需要实现 tryAcquiretryRelease 以及 isHeldExclusively(根据需要)方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 不可重入互斥锁,0 表示 unlocked 状态,使用 1 表示 locked 状态。
*/
static class Mutex implements Lock, java.io.Serializable {
// 内部帮助类
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于 locked 状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

// 如果 state 为 0,则获取锁
@Override
public boolean tryAcquire(int acquires) {
// 不可重入互斥锁 acquires 始终为 1
assert acquires == 1;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 通过将 state 设置为 0 来释放锁
@Override
protected boolean tryRelease(int releases) {
// 不可重入互斥锁 releases 始终为 1
assert releases == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 提供一个 Condition
Condition newCondition() {
return new ConditionObject();
}

// 正确反序列化
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
// 重置为 unlocked 状态
setState(0);
}
}

// sync 对象完成了所有的困难工作,我们直接使用它即可
private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

Shared Mode

共享模式需要实现 tryAcquireSharedtryReleaseShared 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class SharedModeDemo {
/**
* 类似于 CountDownLatch 的 latch 类,但是它只需要一个 signal 即可触发
*/
static class BooleanLatch {
// 内部帮助类
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() {
return getState() != 0;
}

@Override
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}

// sync 对象完成了所有的困难工作,我们直接使用它即可
private final Sync sync = new Sync();

public boolean isSignalled() {
return sync.isSignalled();
}

public void signal() {
sync.releaseShared(1);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}

/**
* test BooleanLatch
*/
public static void main(String[] args) throws InterruptedException {
BooleanLatch latch = new BooleanLatch();
new Thread(() -> {
System.out.println("thread [" + Thread.currentThread().getName() + "] sleep 5s");
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread [" + Thread.currentThread().getName() + "] sleep end, signal thread [main]");
latch.signal();
}, "thread1").start();
System.out.println("thread [main] await");
latch.await();
System.out.println("thread [main] end");
}
}

AbstractOwnableSynchronizer

AbstractOwnableSynchronizer 是 AQS 继承的类,用来记录跟踪拥有独占同步器的线程,为创建需要所有权概念的锁和相关同步器提供基础。该类的子类或一些工具可以通过维护 exclusiveOwnerThread 字段值来帮助用户控制和监控访问并提供诊断。

该类中只有一个字段 exclusiveOwnerThread,从字段名中可以看到,该字段可以记录持有独占锁的线程;此外,还提供了该字段的 set 和 get 方法。

AbstractOwnableSynchronizer

Node

Node 是等待队列(wait queue)的节点类,等待队列包括同步队列(sync queue)和条件队列(condition queue)。

等待队列是 “CLH” 锁队列的变体(之前详细介绍过 “CLH” 锁,see AQS 基础–多图详解 CLH 锁的原理与实现),“CLH” 锁通常用于自旋锁,在 AQS 中将它们用作阻塞同步器,但使用相同的基本策略:在其节点的前驱中保存有关线程的一些控制信息。

每个节点中的 waitStatus 字段可以表示线程是否应该阻塞。节点在其前驱 release 时会收到信号。队列中的每个节点都充当一个特定的通知监视器,持有一个等待线程。如果线程是队列中的第一个,则可以尝试去获取锁,但并不保证获取成功,这只是给予它竞争的权利而已,因此当前已释放的竞争者线程可能也需要继续重新等待。

在该类中,prev 属性在原始 “CLH” 锁中其实并没有用到,在这里主要是用于处理状态为 CANCELLED 的节点;而 next 属性则用来实现阻塞机制,换句话说,next 指针其实是一种优化,通常不需要向后扫描(具体表现在 unparkSuccessor 方法中,先看 next 是否符合条件,不符合再遍历,大多数情况下可能都会符合)。

“CLH” 队列需要一个虚拟头节点来开始。但是我们不在构建时就创建它,因为如果不存在竞争,这就是在做无用功。相反,在第一次竞争时构造节点并设置头尾指针(延迟初始化)。

Conditions 中等待的线程使用相同的节点,但使用额外的链接。Conditions 链接队列中的节点仅在独占持有时才会被访问,所以不需要考虑并发情况。在 await 时,一个节点被插入到 condition queue 中;signal 时,节点被转移到 sync queue。

Node 中的 nextWaiter 字段可以表明当前节点正在独占模式还是共享模式下等待。

Node 节点有以下等待状态(waitStatus):

等待状态 描述 具体描述
- 0 初始值 -
CANCELLED 1 表明线程已取消 由于超时或中断,该节点被取消,此后节点永远不会离开这个状态,而且取消节点的线程永远不会再次阻塞
SIGNAL -1 表明后继线程需要唤醒 该节点的后继节点被(或即将被)阻塞(LockSupport.park 方法),因此当前节点在释放(release 方法)或取消(cancelAcquire 方法)时必须唤醒(unparkSuccessor 方法)其后继节点。为了避免竞争,acquire 方法中必须首先表明它们需要一个信号(shouldParkAfterFailedAcquire 方法),然后重试原子获取,然后在获取失败时阻塞
CONDITION -2 表明线程在 condition queue 中等待 该节点当前在 condition queue 中,直到转移后,它才会用作 sync queue 的节点,此时状态将设置为 0(此处使用此值与该字段的其他用途无关,但可以简化机制)
PROPAGATE -3 表明下一次 acquireShared 应无条件传播 releaseShared 方法应该传播到其他节点,该状态在 doReleaseShared 方法中设置(仅适用于头节点)以确保传播继续,即使其它操作已经介入

Node 类源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
static final class Node {
// 表明节点在共享模式下等待的标记
static final Node SHARED = new Node();
// 表明节点在独占模式下等待的标记
static final Node EXCLUSIVE = null;

// 表明线程已取消的 waitStatus 值
static final int CANCELLED = 1;
// 表明后继线程需要唤醒的 waitStatus 值
static final int SIGNAL = -1;
// 表明线程在 condition 中等待的 waitStatus 值
static final int CONDITION = -2;
// 表明下一次 acquireShared 应无条件传播的 waitStatus 值
static final int PROPAGATE = -3;

// 状态字段,仅采用以下值:SIGNAL / CANCELLED / CONDITION / PROPAGATE / 0
volatile int waitStatus;

// 当前节点的前驱节点,入队时分配,仅在出队时为空(for GC)
volatile Node prev;

// 当前节点的后继节点,入队时分配,绕过取消的前驱节点时进行调整,并在出队时为空(for GC)
volatile Node next;

// 该节点封装的线程。在构造时初始化并在使用后置为 null。
volatile Thread thread;

// 共享模式:特殊值 SHARED
// 独占模式:null 或 下一个在 condition queue 中等待的节点(如果用到 Condition)
Node nextWaiter;

/**
* 如果节点在共享模式下等待,则返回 true
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 返回当前节点的前驱节点,如果为 null 则抛出 NullPointerException,当前驱节点不为空时使用
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

// 用于建立 dummyHead 或 SHARED 标记
Node() {
}

// addWaiter 方法中使用
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

// Condition 中使用(准确的说是在 ConditionObject#addConditionWaiter 方法中使用)
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

在独占模式中,nextWaiter 的值为 null 或下一个在 condition queue 中等待的节点(如果有 condition queue 的话。因本文暂不分析其源码,所以在本文中可以认为该值就是 null)。

另外在独占模式中,Node 的 waitStatus 只会用到 0 / CANCELLED / SIGNAL 这三种状态。

Sync Queue

sync queue 是同步队列,代表所有等待获取锁的线程集合,是个有虚拟头节点(延迟初始化,下文中称其为 dummyHead)的双向链表。head 指向头节点(即 dummyHead),tail 指向尾节点,prev 指向上一个节点,next 指向下一个节点。

sync-queue

对于 head 中 thread 属性可以有另一种理解:sync queue 中的每个节点内部都会封装一个线程,唯独 head 节点中的为 null,其实换个角度,也可以认为 head 节点中也封装了线程,只不过此时该线程获取到了锁,正在执行逻辑。

Fields

4 个重要的字段属性:head / tail / state / spinForTimeoutThreshold
5 个用于支持 CAS 算法的 Unsafe 类操作的字段:state / head / tail / waitStatus / next(略)

fields-1

spinForTimeoutThreshold 字段相当于一个阈值,在一些提供等待时间的方法中会使用该值来判断,当等待时间小于该值(即超时时间非常短)时直接自旋,这样可以提高程序的响应能力。

spinForTimeoutThreshold

Source Code

了解了前文的前置知识,接下来终于要进入到本文的重点,独占锁的源码解析。

acquire

以独占模式获取,忽略中断。通过至少调用一次 tryAcquire 方法来实现,成功则返回,失败则线程排队。线程可能会反复阻塞和解除阻塞,直到调用 tryAcquire 方法获取成功。

acquire 方法执行逻辑:

  1. tryAcquire:(需要子类实现)尝试以独占模式获取,获取成功则返回,失败则执行步骤 2
  2. addWaiter:为获取失败的线程以独占模式(Node.EXCLUSIVE)创建节点并入 sync queue,执行步骤 3
  3. acquireQueued:入队后,队列中的线程以独占不间断模式获取。这期间可能需要将某些已排队的线程挂起,直到收到信号后再次执行
  4. selfInterrupt:获取到锁的线程如果在等待时被中断,在这里中断它(线程在排队等待时忽略中断,但是会记录是否发生过中断)

acquire

Diagram of acquire

在详细解析 acquire 源码之前,先在上帝视角给大家提供一张该方法的执行流程图,方便对执行过程有个详细了解,也方便在下面的源码解析过程中,看到某个方法时,知道程序当前处于什么地方,不至于在复杂的源码中迷失。

diagram-of-acquire

tryAcquire

尝试以独占模式获取。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException。此方法由执行 acquire 的线程调用,返回值代表获取成功(return true)或失败(return false)。

因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getStatesetStatecompareAndSetState 等方法检查 和/或 修改同步状态 state

tryAcquire

addWaiter

为当前线程以给定模式(Node.EXCLUSIVENode.SHARED)创建节点并入 sync queue,返回新入队的节点。

该方法的操作就是正常的节点入队操作,具体逻辑没什么好说的,有几个注意的点:

  1. 如果队列为空,会先创建一个虚拟头节点,再将当前线程节点插入队尾,即采用前文中提到的延迟初始化 head
  2. 新节点入 sync queue 操作:先设置 node.prev = tail,再 CAS 设置 tail = node,最后设置 tail.next = node(注意这几步不是一个原子操作

此外,我们可以从该方法的实现中学到一个重要思想:fast path,即先简单尝试一下,成功就返回,失败再自旋循环。因为其实很多时候,直接操作就能解决大部分问题。

addWaiter

注意,在入队操作过程中,当某线程创建了新节点后,执行 compareAndSetTail(pred, node) 成功,但是 pred.next = node 还没有开始执行时(或刚好执行到此 CPU 给该线程分配的时间片耗尽),可能会出现下面这种现象:

traverse-from-tail

在高并发的场景下,甚至可能会出现下面这种现象:

traverse-from-tail-2

此时,如果从 head 往后遍历队列,会出现一个严重的问题:遍历不到新入队的节点!而从 tail 往前遍历是没有这个问题的。究其原因,就是因为新节点入队操作并不是一个原子操作。这也是 AQS 中遍历队列都采用从 tail 往前遍历的原因。

enq

普通的节点入队操作,主要用到 CAS + 自旋,如果队列为空,会先创建一个虚拟头节点。最终返回新入队节点的前驱节点。

enq

acquireQueued

该方法情况比较复杂,需要详细分析(这里再贴一下前文的执行流程图)。

diagram-of-acquire

acquireQueued 方法执行逻辑:

  1. 如果当前线程是排队的第一个线程,执行 tryAcquire 方法尝试以独占模式获取
  2. 如果获取成功,重新设置队列头节点 (setHead) 后退出
  3. 如果当前线程不是排队的第一个线程 或 当前线程是排队的第一个线程但获取失败,将前驱节点状态置为 SIGNAL (shouldParkAfterFailedAcquire
  4. 设置成功后,挂起线程,等待唤醒 (parkAndCheckInterrupt
  5. 线程被唤醒后回到步骤 1 重复上述步骤,直到获取成功
  6. 如果上述步骤执行过程中出现异常导致获取失败,取消正在进行的获取尝试(cancelAcquire

还有几点需要注意:

  1. interrupted 字段用来记录线程在等待过程中是否发生过中断,该字段也是此方法的最终返回值
  2. 线程在排队等待时是忽略中断的,无论线程在等待过程中是否发生过中断,都需要获取到锁后才能返回。返回后再根据 interrupted 字段的值来决定是否中断

acquireQueued

setHead

head 指向当前节点,内部属性 threadprev 置为 null

其实该方法就是相当于,获取成功后,把该节点置为新的 dummyHead。但注意,此时旧的 dummyHead 还存在,且 oldDummyHead.next = newDummyHead,setHead 执行结束后才会清除旧的 dummyHead,将 oldDummyHead.next 指向 null(这一步在 acquireQueuedp.next = null)。

setHead

思考一下,这里设置头节点为什么不用 compareAndSetHead,而是直接 head = node

因为这是在独占锁情况下,获取到锁的线程只会有一个,因此该方法不会存在并发调用的情况,可以放心大胆的使用 head = node

shouldParkAfterFailedAcquire

该方法逻辑不难理解,方法名也已说明了用意:should park after failed acquire(获取失败后应该挂起吗?)

而该方法唯一 return true 的条件就是 pred.waitStatus = Node.SIGNAL,其它时候都会返回 false。这也说明,获取失败后应该挂起线程的条件是:该节点的前置节点状态为 SIGNAL。否则,我们应该把前置节点的状态设置为 SIGNAL。(这里就同前文提到的 “CLH” 锁呼应上了,即在其节点的前驱中保存有关线程的一些控制信息)

shouldParkAfterFailedAcquire

特别注意:当最终结果返回时,程序此时还处于 acquireQueued 方法的 for (;;) 中。因此,当最终结果返回 false 时,程序会再次回到上述 acquireQueued 方法执行逻辑的步骤 1 去执行。或者这里也可以理解为一个优化,相当于变相又给了线程一个去尝试获取的机会,也许这次就成功了。

parkAndCheckInterrupt

node 的前置节点状态设置为 SIGNAL 了,挂起线程,直到收到释放信号或中断唤醒它。

特别注意:假如此刻它被唤醒,它需要接着去执行。别忘了,程序此时还处于 acquireQueued 方法的 for (;;) 中,所以程序会再次回到上述 acquireQueued 方法执行逻辑的步骤 1 去执行,直到它获取成功。

parkAndCheckInterrupt

思考这个问题:该方法最终返回值是线程是否中断,为什么不用 Thread.currentThread().isInterrupted() 方法?

Thread 源码解析 中说过,这两个方法的区别是:Thread.interrupted() 方法除了会返回线程是否中断外,还会重置线程的中断状态。那么就有个新问题:返回值是线程是否中断,我只需要知道是否中断就行了,为什么还要清除中断状态?

这是因为,中断后,如果不清除中断状态,下次 park 是不生效的。假如该线程下次仍然没有获取到锁,就 park 不住了(关于这个结论,做个实验就知道了。如果不想做实验,我也帮你做了,see The relationship between Interrupt and Park)。因此这里需要一个又能重置线程中断状态又能返回线程是否中断的方法,所以必须使用 Thread.interrupted() 方法。

cancelAcquire

如果在获取过程中出现异常导致获取失败,取消正在进行的获取尝试。

如何取消呢?是不是就是将此节点从队列中彻底删除?

其实不是的,该方法对应的情况比较多,基本上最终就是修改了节点的 waitStatus 以及 next 指针,将节点彻底从队列中删除是依赖后续的遍历 / setHead / p.next = null 操作。具体可见上篇文章 AQS-cancelAcquire 方法源码解析

cancelAcquire

unparkSuccessor

唤醒 node 的后继节点。在前文 cancelAcquire 方法中,如果 node 不是 tail && pred 是 head 则会调用此方法;另外在释放锁时也会调用此方法。

unparkSuccessor 方法执行逻辑:

  1. 如果 node.waitStatus < 0,即 node 需要给它的后继节点发信号或者说 node 的后继节点需要被唤醒,将 node 的 waitStatus 置为 0(清除预期的信号)
  2. 寻找要 unpark 的后继节点,如果找到,则唤醒它。具体逻辑为:先看 node 的 next 是否符合条件,如果符合就唤醒;如果不符合就从 tail 往前遍历寻找,如果找到就唤醒(这里也同前文提到的 Node 类的 next 指针的作用呼应上了)

unparkSuccessor

node.waitStatus < 0 的判断主要是为了清除 node 的 waitStatus,也就是说,此时,无论 node 后的节点是什么状态,无论 node 的后继节点是否被唤醒,只要调用完该方法,node 的状态肯定为 0 或 CANCELLED(node.waitStatus >= 0,即清除了预期的信号)。

selfInterrupt

当前线程在等待过程中被中断,中断当前线程。

selfInterrupt

这里可能有人会有疑问,当前线程已经在等待过程中被中断,为什么又要中断一次?

因为 parkAndCheckInterrupt 方法里 check interrupt 时用的是 Thread.interrupted() 方法,前文提到,该方法除了会返回当前线程是否被中断外,还会重置中断状态。因此,必须在这里补上这次中断。

release

该方法用来在独占模式下释放锁。该方法返回值和 tryRelease 方法的返回值一致。

release 方法执行逻辑:

  1. 调用 tryRelease 方法尝试设置同步状态 state,如果 tryRelease 返回 false,则 release 返回 false,否则执行步骤 2
  2. 这两种情况下不做操作,直接返回 true:当 head 节点为空,说明队列为空;当 head 节点不为空且 waitStatus 为 0,说明队列中此时只有一个 head 节点或有其它线程正在执行 unparkSuccessor 方法。否则去调用 unparkSuccessor 方法去唤醒 head 的后继节点,然后返回 true

release

head 节点的 waitStatus 为 0,此时可能有其它线程正在执行 unparkSuccessor 方法,是不是觉得不可思议,独占锁模式下竟然还会有两个线程同时调用 unparkSuccessor 方法?

这是有可能的。前文中说过,独占锁模式下,unparkSuccessor 方法可能会有两处调用,一处是在 cancelAcquire 方法中,一处是在 release 方法中。假设这种情况:

  1. A 线程获取到锁后执行逻辑(还未调用 release 方法)
  2. B、C 线程获取不到锁,相继入队
  3. B 线程执行 acquireQueued 方法结果抛异常走到了 cancelAcquire 方法中,执行 compareAndSetWaitStatus(node, ws, 0) 这句成功,然后在满足一系列条件后调用 unparkSuccessor 方法去唤醒 C 线程
  4. A 线程执行完逻辑后调用 release 方法,此时发现 headwaitStatus 等于 0,其实意味着 A 要唤醒后继节点的工作被 B 给做了,那 A 什么都不用做直接 return true 即可

tryRelease

tryRelease 方法主要是尝试设置同步状态 state。具体逻辑需要子类去实现,默认实现抛出 UnsupportedOperationException。此方法始终由执行 release 的线程调用。返回值 true 表示如果此对象现在处于完全 release 状态,任何等待线程都可以尝试获取;否则返回 false

因为要考虑多线程的情况,所以子类在具体实现中可能需要使用 getStatesetStatecompareAndSetState 等方法检查 和/或 修改同步状态 state

tryRelease

unparkSuccessor 的逻辑前文已介绍,此处不再赘述。另外,别忘了唤醒后还是会继续去获取锁的。

tryAcquireNanos

大体逻辑同 acquire 方法,如注释所说,和 acquire 方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquire 方法,在这其中做了超时和中断判断。成功则返回 true,超时则返回 false,中断则抛出 InterruptedException

tryAcquireNanos

doAcquireNanos

该方法相当于把前文中说的 addWaiter(Node.EXCLUSIVE) 方法揉进了 acquireQueued() 方法里,并做了超时和中断判断。注意:只有返回 true 时才不会调用 cancelAcquire 方法。

而且该方法在挂起线程之前做了个判断,当最长等待时间大于 spinForTimeoutThreshold 时,才会挂起。

spinForTimeoutThreshold 字段的作用前文说过,这里再来复述一遍:该值相当于一个阈值,在一些提供等待时间的操作中会使用该值来判断,当等待时间小于该值(即超时时间非常短)时直接自旋,这样可以提高程序的响应能力。

doAcquireNanos

acquireInterruptibly

大体逻辑同 acquire 方法,如注释所说,和 acquire 方法不同的是,会先检查中断状态,然后至少调用一次 tryAcquire 方法,在这其中做了中断判断。中断则抛出 InterruptedException

acquireInterruptibly

doAcquireInterruptibly

该方法相当于把前文中说的 addWaiter(Node.EXCLUSIVE) 方法揉进了 acquireQueued 方法里,并做了中断判断。

doAcquireInterruptibly

至此,AQS 独占锁源码就解析完了,下篇文章会继续解析共享锁源码。个人认为共享锁对应的情况更复杂一些,所以也更难理解一些。


前言

AQS 的 cancelAcquire 方法值得花一篇文章来写,因为虽然代码不多,但是涉及到的情况不少。

cancelAcquire

在 AQS 源码中该方法共被 6 个地方调用(3 个独占锁,3 个共享锁):
acquireQueued / doAcquireInterruptibly / doAcquireNanos
doAcquireShared / doAcquireSharedInterruptibly / doAcquireSharedNanos

该方法目的为:获取过程抛出异常后,取消正在进行的获取尝试。

不考虑 Condition 的前提下,该方法是唯一会将 sync queue 中 Node 的 waitStatus 置为 CANCELLED 的方法。

穷举所有 case

从该方法最后部分 if (node == tail && ...if (pred != head && ... 两句可以看出,该方法进行操作的三个不同的大前提条件为:

  1. node 是 tail
  2. node 不是 tail && pred 不是 head
  3. node 不是 tail && pred 是 head

我们先不考虑这些 case 如何达成,本文我们暂时只关注:穷举出所有 case 并看每种 case 的最终结果

注:下文中 normalNode 代表 waitStatus <= 0 的节点;cancelNode 代表 waitStatus 为 CANCELLED 的节点。

  1. node 是 tail + pred 是 head

    head <=> [node]
    head <=> cancelNode <=> [node]

  2. node 是 tail + pred 不是 head

    head <=> normalNode <=> [node]
    head <=> normalNode <=> cancelNode <=> [node]

  3. node 不是 tail && pred 不是 head + node 后为 normalNode

    head <=> normalNode <=> [node] <=> normalNode
    head <=> normalNode <=> cancelNode <=> [node] <=> normalNode

  4. node 不是 tail && pred 不是 head + node 后为 cancelNode

    head <=> normalNode <=> [node] <=> cancelNode
    head <=> normalNode <=> cancelNode <=> [node] <=> cancelNode

  5. node 不是 tail && pred 是 head + node 后为 normalNode

    head <=> [node] <=> normalNode
    head <=> cancelNode <=> [node] <=> normalNode

  6. node 不是 tail && pred 是 head + node 后为 cancelNode

    head <=> [node] <=> cancelNode
    head <=> cancelNode <=> [node] <=> cancelNode

(在下文中会看到,其实 5 和 6 是可以合并的)

node 是 tail + pred 是 head

case1.1

head <=> [node]

case1.1

case1.2

head <=> cancelNode <=> [node]

case1.2

何时清除 node

当 head 节点变更时,即前置线程释放锁,sync queue 中第一个排队线程获取锁后调用了 setHead 方法和 p.next = null 后。

node 是 tail + pred 不是 head

case2.1

head <=> normalNode <=> [node]

case2.1

case2.2

head <=> normalNode <=> cancelNode <=> [node]

case2.2

何时清除 node

当排队线程陆续获取锁直到 pred 变为 head,然后(同 case1)当 head 节点变更时,即前置线程释放锁,sync queue 中第一个排队线程获取锁后调用了 setHead 方法和 p.next = null 后。

node 不是 tail && pred 不是 head + node 后为 normalNode

case3.1

head <=> normalNode <=> [node] <=> normalNode

case3.1

case3.2

head <=> normalNode <=> cancelNode <=> [node] <=> normalNode

case3.2

何时清除 node

当别的线程调用 shouldParkAfterFailedAcquirecancelAcquire 方法时,会调整其遍历节点的指针,(同 case2)当排队线程陆续获取锁直到 node 后的 normalNode 变为 head,然后当 head 节点变更时,即前置线程释放锁,sync queue 中第一个排队线程获取锁后调用了 setHead 方法和 p.next = null 后。

node 不是 tail && pred 不是 head + node 后为 cancelNode

case4.1

head <=> normalNode <=> [node] <=> cancelNode

case4.1

case4.2

head <=> normalNode <=> cancelNode <=> [node] <=> cancelNode

case4.2

何时清除 node

当 node 后面的 cancelNode 后面有了 normalNode,(同 case3)当别的线程调用 shouldParkAfterFailedAcquirecancelAcquire 方法时,会调整其遍历节点的指针,当排队线程陆续获取锁直到 node 后的 normalNode 变为 head,然后当 head 节点变更时,即前置线程释放锁,sync queue 中第一个排队线程获取锁后调用了 setHead 方法和 p.next = null 后。

case5 + case6

case5:node 不是 tail && pred 是 head + node 后为 normalNode
case6:node 不是 tail && pred 是 head + node 后为 cancelNode

可以发现,其实该方法的操作不会涉及到 node 后所跟的节点,因此,无论 node 后跟的是 normalNode 或 cancelNode,结果其实都是一样的。

case5.1 + case6.1

head <=> [node] <=> normalNode
head <=> [node] <=> cancelNode

case5.1

case5.2 + case6.2

head <=> cancelNode <=> [node] <=> normalNode
head <=> cancelNode <=> [node] <=> cancelNode

case5.2

何时清除 node

  1. 当 node 后是 normalNode 时,会唤醒线程去获取锁,获取到锁后该节点变为头节点,注意此时 node 其实还没有彻底清除掉,还需要此后(同 case1)当 head 节点变更时,即前置线程释放锁,sync queue 中第一个排队线程获取锁后调用了 setHead 方法后
  2. 当 node 后是 cancelNode 时,啥也不会做,因为唤不醒该线程,只能等(同 case4)当 node 后面的 cancelNode 后面有了 normalNode,当别的线程调用 shouldParkAfterFailedAcquirecancelAcquire 方法时,会调整其遍历节点的指针,当排队线程陆续获取锁直到 node 后的 normalNode 变为 head,然后当 head 节点变更时,即前置线程释放锁,sync queue 中第一个排队线程获取锁后调用了 setHead 方法和 p.next = null

小结

该方法无论什么条件,都会做的事是:

  1. pred 指向从 node 往前遍历的第一个状态不为 CANCELLED 的节点
  2. node.prev 指向 pred
  3. node 的 waitStatus 置为 CANCELLED

此后,当条件为:

  1. node 是 tail:将 tail 指向 pred;将 pred.next 指向 null
  2. node 不是 tail && pred 不是 head:将 pred 的 waitStatus 置为 SIGNAL;如果 node.next 的 waitStatus 不为 CANCELLED,将 pred.next 指向 node.next
  3. node 不是 tail && pred 是 head:调用 unparkSuccessor 方法唤醒 node 的后继节点

可以发现该方法很多结果最终都是修改 next 指针而保留 prev 指针,我认为这是因为考虑到还有很多方法都采用从 tail 往前遍历,而遍历时会跳过 waitStatus 为 CANCELLED 的节点。因此在这里只要修改节点的 waitStatus 以及 next 指针就足够了,早晚都会有别的方法会通过遍历 / setHead / p.next = null 去将 waitStatus 为 CANCELLED 的节点从 sync queue 中完全断开!


前言

本文产生原因源于 AQS 中的 parkAndCheckInterrupt() 方法。

AQS-parkAndCheckInterrupt

如注释所说,该方法的作用为:挂起线程后检查是否中断,如果被中断则返回 true。

该方法被调用的时机为:获取锁失败,线程在同步队列中挂起自己,等待被唤醒后继续获取锁。

问题:我们只是需要知道线程是否在等待时中断而已,为什么返回值不用 Thread.currentThread().isInterrupted() 方法?

前置知识:

  1. 中断线程可以唤醒被挂起的线程
  2. Thread#isInterrupted() 和 Thread.interrupted() 的区别是:后者会重置中断状态

本文将扩展一下标题所述的范围,通过几个 demo 来探究一下 Thread#interrupt() / Thread.interrupted() / Thread.sleep() / LockSupport.park() / LockSupport.unpark() 之间的一些影响。

测试环境

java-version

注:为了保证结果严谨性,每个 demo 都会包含多线程和单线程的模拟。

case1: interrupt and park

A 线程多次调用 LockSupport.park() 挂起自己,B 线程调用 interrupt() 中断 A 线程 or
A 线程先调用 interrupt() 中断自己,再多次调用 LockSupport.park() 挂起自己

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* A 线程多次调用 LockSupport.park() 挂起自己,B 线程调用 interrupt() 中断 A 线程
*/
public class Test {
public static void main(String[] args) throws Exception {
Test1 t = new Test1();
t.start();
Thread.sleep(1000L);
t.interrupt();
}

static class Test1 extends Thread {
@Override
public void run() {
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park();
LockSupport.park();
LockSupport.park();
System.out.println("final park");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* A 线程先调用 interrupt() 中断自己,再多次调用 LockSupport.park() 挂起自己
*/
public class Test {
public static void main(String[] args) {
Test1 t = new Test1();
t.start();
}

static class Test1 extends Thread {
@Override
public void run() {
Thread.currentThread().interrupt();
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park();
LockSupport.park();
LockSupport.park();
System.out.println("final park");
}
}
}

结果

case1-result

结论

调用 interrupt() 方法中断线程前,线程会正常挂起;中断后,无论调用多少次 LockSupport.park(),线程都不会挂起,而是正常运行结束。

case2: interrupt and park and interrupted

(在 case1 的基础上)
A 线程多次调用 LockSupport.park() 挂起自己后调用 Thread.interrupted() 重置中断状态,最后再调用 LockSupport.park() 挂起自己,B 线程调用 interrupt() 中断 A 线程 or
A 线程先调用 interrupt() 中断自己,再多次调用 LockSupport.park() 挂起自己,然后调用 Thread.interrupted() 重置中断状态,最后再调用 LockSupport.park() 挂起自己

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* A 线程多次调用 LockSupport.park() 挂起自己后调用 Thread.interrupted() 重置中断状态,最后再调用 LockSupport.park() 挂起自己,B 线程调用 interrupt() 中断 A 线程
*/
public class Test {
public static void main(String[] args) throws Exception {
Test2 t = new Test2();
t.start();
Thread.sleep(1000L);
t.interrupt();
}

public static class Test2 extends Thread {
@Override
public void run() {
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park();
System.out.println("after second park");
LockSupport.park();
System.out.println("after third park");
System.out.println(Thread.interrupted()); // true
System.out.println(Thread.interrupted()); // false
LockSupport.park(); // 挂起
System.out.println("final park");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* A 线程先调用 interrupt() 中断自己,再多次调用 LockSupport.park() 挂起自己,然后调用 Thread.interrupted() 重置中断状态,最后再调用 LockSupport.park() 挂起自己
*/
public class Test {
public static void main(String[] args) {
Test2 t = new Test2();
t.start();
}

public static class Test2 extends Thread {
@Override
public void run() {
Thread.currentThread().interrupt();
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park();
System.out.println("after second park");
LockSupport.park();
System.out.println("after third park");
System.out.println(Thread.interrupted()); // true
System.out.println(Thread.interrupted()); // false
LockSupport.park(); // 挂起
System.out.println("final park");
}
}
}

结果

case2-result

结论

(结合 case1 的结论)
调用 interrupt() 方法中断线程前,线程会正常挂起;中断后,无论调用多少次 LockSupport.park(),线程都不会挂起,直到重置中断状态后,再次调用 LockSupport.park() 线程才会挂起。

case3 interrupt and park and interrupted 2

(在 case2 的基础上)
将调用 Thread.interrupted() 重置中断状态提前到 interrupt() 中断后,再多次调用 LockSupport.park()。即 Thread.interrupted() 后先立即进行重置中断状态操作。

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Test {
private static boolean flag = true;

public static void main(String[] args) {
Test3 t = new Test3();
t.start();
t.interrupt();
flag = false;
}

public static class Test3 extends Thread {
@Override
public void run() {
while (flag) {}
System.out.println(Thread.interrupted()); // true
System.out.println(Thread.interrupted()); // false
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park(); // 挂起
System.out.println("final park");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Test {
public static void main(String[] args) {
Test3 t = new Test3();
t.start();
}

public static class Test3 extends Thread {
@Override
public void run() {
Thread.currentThread().interrupt();
System.out.println(Thread.interrupted()); // true
System.out.println(Thread.interrupted()); // false
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park(); // 挂起
System.out.println("final park");
}
}
}

结果

case3-result

结论

调用 interrupt() 方法中断线程然后立即重置中断状态,之后,第一次调用 LockSupport.park() 不会挂起线程,再次调用 LockSupport.park() 线程才会挂起。

case4: sleep and park

A 线程 sleep 时被 B 线程中断后,再多次调用 LockSupport.park()。

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* A 线程 sleep 时被 B 线程中断后,再多次调用 LockSupport.park()
*/
public class Test {
public static void main(String[] args) {
Test4 t = new Test4();
t.start();
t.interrupt();
}

public static class Test4 extends Thread {
@Override
public void run() {
try {
Thread.sleep(3000L);
System.out.println("sleep end");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("interrupt");
}
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park(); // 挂起
System.out.println("final park");
}
}
}

结果

case4-result

结论

(类似 case3 的结论)
线程 sleep 时被中断抛出 InterruptedException 重置中断状态,之后,第一次调用 LockSupport.park() 不会挂起线程,再次调用 LockSupport.park() 线程才会挂起。

case5: unpark and park

A 线程多次调用 LockSupport.unpark(threadB),B 线程多次调用 LockSupport.park() 挂起自己 or
A 线程多次调用 LockSupport.unpark(threadA),再多次调用 LockSupport.park() 挂起自己

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* A 线程多次调用 LockSupport.unpark(threadB),B 线程多次调用 LockSupport.park() 挂起自己
*/
public class Test {
public static void main(String[] args) {
Test5 t = new Test5();
t.start();
LockSupport.unpark(t);
LockSupport.unpark(t);
LockSupport.unpark(t);
}

public static class Test5 extends Thread {
@Override
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park(); // 挂起
LockSupport.park();
System.out.println("final park");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* A 线程多次调用 LockSupport.unpark(threadA),再多次调用 LockSupport.park() 挂起自己
*/
public class Test {
public static void main(String[] args) {
Test5 t = new Test5();
t.start();
}

public static class Test5 extends Thread {
@Override
public void run() {
LockSupport.unpark(Thread.currentThread());
LockSupport.unpark(Thread.currentThread());
LockSupport.unpark(Thread.currentThread());
System.out.println("before first park");
LockSupport.park();
System.out.println("after first park");
LockSupport.park(); // 挂起
LockSupport.park();
System.out.println("final park");
}
}
}

结果

case5-result

结论

无论调用多少次 LockSupport.unpark(thread),都只会提供给线程一个许可。

小结

至此,前言中问题的原因就比较清晰了:中断后,如果不清除中断状态,下次 park 是不生效的。

假如该线程下次仍然获取锁失败,再调用 parkAndCheckInterrupt() 就不能再挂起自己了。因此这里需要一个又能重置线程中断状态又能返回线程是否中断的方法,Thread.interrupted() 方法是再合适不过了。