0%

本文转载自公众号 “源码笔记”。

什么是自旋锁和互斥锁?

由于CLH锁是一种自旋锁,那么我们先来看看自旋锁是什么?

自旋锁说白了也是一种互斥锁,只不过没有抢到锁的线程会一直自旋等待锁的释放,处于busy-waiting的状态,此时等待锁的线程不会进入休眠状态,而是一直忙等待浪费CPU周期。因此自旋锁适用于锁占用时间短的场合。

这里谈到了自旋锁,那么我们也顺便说下互斥锁。这里的互斥锁说的是传统意义的互斥锁,就是多个线程并发竞争锁的时候,没有抢到锁的线程会进入休眠状态即sleep-waiting,当锁被释放的时候,处于休眠状态的一个线程会再次获取到锁。缺点就是这一些列过程需要线程切换,需要执行很多CPU指令,同样需要时间。如果CPU执行线程切换的时间比锁占用的时间还长,那么可能还不如使用自旋锁。因此互斥锁适用于锁占用时间长的场合。

什么是CLH锁?

CLH锁其实就是一种是基于逻辑队列非线程饥饿的一种自旋公平锁,由于是 Craig、Landin 和 Hagersten三位大佬的发明,因此命名为CLH锁。

CLH锁原理如下:

  1. 首先有一个尾节点指针,通过这个尾结点指针来构建等待线程的逻辑队列,因此能确保线程线程先到先服务的公平性,因此尾指针可以说是构建逻辑队列的桥梁;此外这个尾节点指针是原子引用类型,避免了多线程并发操作的线程安全性问题;
  2. 通过等待锁的每个线程在自己的某个变量上自旋等待,这个变量将由前一个线程写入。由于某个线程获取锁操作时总是通过尾节点指针获取到前一线程写入的变量,而尾节点指针又是原子引用类型,因此确保了这个变量获取出来总是线程安全的。

这么说肯定很抽象,有些小伙伴可能不理解,没关系,我们心中可以有个概念即可,后面我们会一步一图来彻彻底底把CLH锁弄明白。

为什么要学习CLH锁?

好了,前面我们对CLH锁有了一个概念后,那么我们为什么要学习CLH锁呢?

研究过AQS源码的小伙伴们应该知道,AQS是JUC的核心,而CLH锁又是AQS的基础,说核心也不为过,因为AQS就是用了变种的CLH锁。如果要学好Java并发编程,那么必定要学好JUC;学好JUC,必定要先学好AQS;学好AQS,那么必定先学好CLH。因此,这就是我们为什么要学习CLH锁的原因。

CLH锁详解

那么,下面我们先来看CLH锁实现代码,然后通过一步一图来详解CLH锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// CLHLock.java

public class CLHLock {
/**
* CLH锁节点
*/
private static class CLHNode {
// 锁状态:默认为false,表示线程没有获取到锁;true表示线程获取到锁或正在等待
// 为了保证locked状态是线程间可见的,因此用volatile关键字修饰
volatile boolean locked = false;
}
// 尾结点,总是指向最后一个CLHNode节点
// 【注意】这里用了java的原子系列之AtomicReference,能保证原子更新
private final AtomicReference<CLHNode> tailNode;
// 当前节点的前继节点
private final ThreadLocal<CLHNode> predNode;
// 当前节点
private final ThreadLocal<CLHNode> curNode;

// CLHLock构造函数,用于新建CLH锁节点时做一些初始化逻辑
public CLHLock() {
// 初始化时尾结点指向一个空的CLH节点
tailNode = new AtomicReference<>(new CLHNode());
// 初始化当前的CLH节点
curNode = ThreadLocal.withInitial(CLHNode::new);
// 初始化前继节点,注意此时前继节点没有存储CLHNode对象,存储的是null
predNode = new ThreadLocal<>();
}

/**
* 获取锁
*/
public void lock() {
// 取出当前线程ThreadLocal存储的当前节点,初始化值总是一个新建的CLHNode,locked状态为false。
CLHNode currNode = curNode.get();
// 此时把lock状态置为true,表示一个有效状态,
// 即获取到了锁或正在等待锁的状态
currNode.locked = true;
// 当一个线程到来时,总是将尾结点取出来赋值给当前线程的前继节点;
// 然后再把当前线程的当前节点赋值给尾节点
// 【注意】在多线程并发情况下,这里通过AtomicReference类能防止并发问题
// 【注意】哪个线程先执行到这里就会先执行predNode.set(preNode);语句,因此构建了一条逻辑线程等待链
// 这条链避免了线程饥饿现象发生
CLHNode preNode = tailNode.getAndSet(currNode);
// 将刚获取的尾结点(前一线程的当前节点)赋给当前线程的前继节点ThreadLocal
// 【思考】这句代码也可以去掉吗,如果去掉有影响吗?
predNode.set(preNode);
// 【1】若前继节点的locked状态为false,则表示获取到了锁,不用自旋等待;
// 【2】若前继节点的locked状态为true,则表示前一线程获取到了锁或者正在等待,自旋等待
while (preNode.locked) {
System.out.println("线程 " + Thread.currentThread().getName() + " 没能获取到锁,进行自旋等待。。。");
Thread.yield();
}
// 能执行到这里,说明当前线程获取到了锁
System.out.println("线程 " + Thread.currentThread().getName() + " 获取到了锁!!!");
}

/**
* 释放锁
*/
public void unLock() {
// 获取当前线程的当前节点
CLHNode node = curNode.get();
// 进行解锁操作
// 这里将locked至为false,此时执行了lock方法正在自旋等待的后继节点将会获取到锁
// 【注意】而不是所有正在自旋等待的线程去并发竞争锁
node.locked = false;
System.out.println("线程 " + Thread.currentThread().getName() + " 释放了锁!!!");
// 小伙伴们可以思考下,下面两句代码的作用是什么??
CLHNode newCurNode = new CLHNode();
curNode.set(newCurNode);

// 【优化】能提高GC效率和节省内存空间,请思考:这是为什么?
// curNode.set(predNode.get());
}
}

CLH锁的初始化逻辑

通过上面代码,我们缕一缕CLH锁的初始化逻辑先:

  1. 定义了一个CLHNode节点,里面有一个locked属性,表示线程线程是否获得锁,默认为falsefalse表示线程没有获取到锁或已经释放锁;true表示线程获取到了锁或者正在自旋等待。

    注意,为了保证locked属性线程间可见,该属性被volatile修饰。

  2. CLHLock有三个重要的成员变量尾节点指针tailNode,当前线程的前继节点preNode和当前节点curNode。其中tailNodeAtomicReference类型,目的是为了保证尾节点的线程安全性;此外,preNodecurNode都是ThreadLocal类型即线程本地变量类型,用来保存每个线程的前继CLHNode和当前CLHNode节点。
  3. 最重要的是我们新建一把CLHLock对象时,此时会执行构造函数里面的初始化逻辑。此时给尾指针tailNode和当前节点curNode初始化一个locked状态为falseCLHNode节点,此时前继节点preNode存储的是null

CLH锁的加锁过程

我们再来看看CLH锁的加锁过程,下面再贴一遍加锁lock方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// CLHLock.java

/**
* 获取锁
*/
public void lock() {
// 取出当前线程ThreadLocal存储的当前节点,初始化值总是一个新建的CLHNode,locked状态为false。
CLHNode currNode = curNode.get();
// 此时把lock状态置为true,表示一个有效状态,
// 即获取到了锁或正在等待锁的状态
currNode.locked = true;
// 当一个线程到来时,总是将尾结点取出来赋值给当前线程的前继节点;
// 然后再把当前线程的当前节点赋值给尾节点
// 【注意】在多线程并发情况下,这里通过AtomicReference类能防止并发问题
// 【注意】哪个线程先执行到这里就会先执行predNode.set(preNode);语句,因此构建了一条逻辑线程等待链
// 这条链避免了线程饥饿现象发生
CLHNode preNode = tailNode.getAndSet(currNode);
// 将刚获取的尾结点(前一线程的当前节点)赋给当前线程的前继节点ThreadLocal
// 【思考】这句代码也可以去掉吗,如果去掉有影响吗?
predNode.set(preNode);
// 【1】若前继节点的locked状态为false,则表示获取到了锁,不用自旋等待;
// 【2】若前继节点的locked状态为true,则表示前一线程获取到了锁或者正在等待,自旋等待
while (preNode.locked) {
System.out.println("线程 " + Thread.currentThread().getName() + " 没能获取到锁,进行自旋等待。。。");
Thread.yield();
}
// 能执行到这里,说明当前线程获取到了锁
System.out.println("线程 " + Thread.currentThread().getName() + " 获取到了锁!!!");
}

虽然代码的注释已经很详细,我们还是缕一缕线程加锁的过程:

  1. 首先获得当前线程的当前节点curNode,这里每次获取的CLHNode节点的locked状态都为false
  2. 然后将当前CLHNode节点的locked状态赋值为true,表示当前线程的一种有效状态,即获取到了锁或正在等待锁的状态;
  3. 因为尾指针tailNode的总是指向了前一个线程的CLHNode节点,因此这里利用尾指针tailNode取出前一个线程的CLHNode节点,然后赋值给当前线程的前继节点predNode,并且将尾指针重新指向最后一个节点即当前线程的当前CLHNode节点,以便下一个线程到来时使用;
  4. 根据前继节点(前一个线程)的locked状态判断,若lockedfalse,则说明前一个线程释放了锁,当前线程即可获得锁,不用自旋等待;若前继节点的locked状态为true,则表示前一线程获取到了锁或者正在等待,自旋等待。

为了更通俗易懂,我们用一个图来说明。

假如有这么一个场景:有四个并发线程同时启动执行lock操作,假如四个线程的实际执行顺序为:threadA<–threadB<–threadC<–threadD

第一步,线程A过来,执行了lock操作,获得了锁,此时locked状态为true,如下图:

4.2-1

第二步,线程B过来,执行了lock操作,由于线程A还未释放锁,此时自旋等待,locked状态也为true,如下图:

4.2-2

第三步,线程C过来,执行了lock操作,由于线程B处于自旋等待,此时线程C也自旋等待(因此CLH锁是公平锁),locked状态也为true,如下图:

4.2-3

第四步,线程D过来,执行了lock操作,由于线程C处于自旋等待,此时线程D也自旋等待,locked状态也为true,如下图:

4.2-4

这就是多个线程并发加锁的一个过程图解,当前线程只要判断前一线程的locked状态如果是true,那么则说明前一线程要么拿到了锁,要么也处于自旋等待状态,所以自己也要自旋等待。而尾指针tailNode总是指向最后一个线程的CLHNode节点。

CLH锁的释放锁过程

前面用图解结合代码说明了CLH锁的加锁过程,那么,CLH锁的释放锁的过程又是怎样的呢?同样,我们先贴下释放锁的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// CLHLock.java

/**
* 释放锁
*/
public void unLock() {
// 获取当前线程的当前节点
CLHNode node = curNode.get();
// 进行解锁操作
// 这里将locked至为false,此时执行了lock方法正在自旋等待的后继节点将会获取到锁
// 【注意】而不是所有正在自旋等待的线程去并发竞争锁
node.locked = false;
System.out.println("线程 " + Thread.currentThread().getName() + " 释放了锁!!!");
// 小伙伴们可以思考下,下面两句代码的作用是什么??
CLHNode newCurNode = new CLHNode();
curNode.set(newCurNode);

// 【优化】能提高GC效率和节省内存空间,请思考:这是为什么?
// curNode.set(predNode.get());
}

可以看到释放CLH锁的过程代码比加锁简单多了,下面同样缕一缕:

  1. 首先从当前线程的线程本地变量中获取出当前CLHNode节点,同时这个CLHNode节点被后面一个线程的preNode变量指向着;
  2. 然后将locked状态置为false即释放了锁;

    注意locked因为被volitile关键字修饰,此时后面自旋等待的线程的局部变量preNode.locked也为false,因此后面自旋等待的线程结束while循环即结束自旋等待,此时也获取到了锁。这一步骤也在异步进行着。

  3. 然后给当前线程的表示当前节点的线程本地变量重新赋值为一个新的CLHNode

    思考:这一步看上去是多余的,其实并不是。请思考下为什么这么做?我们后续会继续深入讲解。

我们还是用一个图来说说明CLH锁释放锁的场景,接着前面四个线程加锁的场景,假如这四个线程加锁后,线程A开始释放锁,此时线程B获取到锁,结束自旋等待,然后线程C和线程D仍然自旋等待,如下图:

4.3-1

以此类推,线程B释放锁的过程也跟上图类似,这里不再赘述。

同个线程加锁释放锁再次正常获取锁

在前面4.3小节讲到释放锁unLock方法中有下面两句代码:

1
2
CLHNode newCurNode = new CLHNode();
curNode.set(newCurNode);

这两句代码的作用是什么?这里先直接说结果:若没有这两句代码,若同个线程加锁释放锁后,然后再次执行加锁操作,这个线程就会陷入自旋等待的状态。这是为啥,可能有些下伙伴也没明白,劲越也是搞了蛮久才搞明白,嘿嘿。

下面我们同样通过一步一图的形式来分析这两句代码的作用。假如有下面这样一个场景:线程A获取到了锁,然后释放锁,然后再次获取锁。

第一步: 线程A执行了lock操作,获取到了锁,如下图:

4.4-1

上图的加锁操作中,线程A的当前CLHNode节点的locked状态被置为true;然后tailNode指针指向了当前线程的当前节点;最后因为前继节点的locked状态为false,不用自旋等待,因此获得了锁。

第二步: 线程A执行了unLock操作,释放了锁,如下图:

4.4-2

上图的释放锁操作中,线程A的当前CLHNode节点的locked状态被置为false,表示释放了锁;然后新建了一个新的CLHNode节点newCurNode,线程A的当前节点线程本地变量值重新指向了newCurNode节点对象。

第三步: 线程A再次执行lock操作,重新获得锁,如下图:

4.4-3

上图的再次获取锁操作中,首先将线程A的当前CLHNode节点的locked状态置为true;然后首先通过tailNode尾指针获取到前继节点即第一,二步中的curNode对象,然后线程A的前继节点线程本地变量的值重新指向了重新指向了curNode对象;然后tailNode尾指针重新指向了新创建的CLHNode节点newCurNode对象。最后因为前继节点的locked状态为false,不用自旋等待,因此获得了锁。

扩展: 注意到以上图片的preNode对象此时没有任何引用,所以当下一次会被GC掉。前面是通过每次执行unLock操作都新建一个新的CLHNode节点对象newCurNode,然后让线程A的当前节点线程本地变量值重新指向newCurNode。因此这里完全不用重新创建新的CLHNode节点对象,可以通过curNode.set(predNode.get());这句代码进行优化,提高GC效率和节省内存空间。

考虑同个线程加锁释放锁再次获取锁异常的情况

现在我们把unLock方法的CLHNode newCurNode = new CLHNode();curNode.set(newCurNode);这两句代码注释掉,变成了下面这样:

1
2
3
4
5
6
7
8
9
// CLHLock.java

public void unLock() {
CLHNode node = curNode.get();
node.locked = false;
System.out.println("线程 " + Thread.currentThread().getName() + " 释放了锁!!!");
/*CLHNode newCurNode = new CLHNode();
curNode.set(newCurNode);*/
}

那么结果就是线程A通过加锁,释放锁后,再次获取锁时就会陷入自旋等待的状态,这又是为什么呢?我们下面来详细分析。

第一步: 线程A执行了lock操作,获取到了锁,如下图:

4.5-1

上图的加锁操作中,线程A的当前CLHNode节点的locked状态被置为true;然后tailNode指针指向了当前线程的当前节点;最后因为前继节点的locked状态为false,不用自旋等待,因此获得了锁。这一步没有什么异常。

第二步: 线程A执行了unLock操作,释放了锁,如下图:

4.5-2

现在已经把unLock方法的CLHNode newCurNode = new CLHNode();curNode.set(newCurNode);这两句代码注释掉了,因此上图的变化就是线程A的当前CLHNode节点的locked状态置为false即可。

第三步: 线程A再次执行lock操作,此时会陷入一直自旋等待的状态,如下图:

4.5-3

通过上图对线程A再次获取锁的lock方法的每一句代码进行分析,得知虽然第二步中将线程A的当前CLHNodelocked状态置为false了,但是在第三步线程A再次获取锁的过程中,将当前CLHNodelocked状态又置为true了,且尾指针tailNode指向的依然还是线程A的当前当前CLHNode节点。又因为每次都是将尾指针tailNode指向的CLHNode节点取出来给当前线程的前继CLHNode节点,之后执行while(predNode.locked) {}语句时,此时因为predNode.locked = true,因此线程A就永远自旋等待了。

测试CLH锁

下面我们通过一个Demo来测试前面代码实现的CLH锁是否能正常工作,直接上测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// CLHLockTest.java

/**
* 测试 CLHLocke
*
* 定义一个静态成员变量 cnt,然后开 10 个线程跑起来,看能是否会有线程安全问题
*/
public class CLHLockTest {
private static int cnt = 0;
private static final int COUNT = 10;

public static void main(String[] args) throws Exception {
final CLHLock lock = new CLHLock();
CountDownLatch latch = new CountDownLatch(COUNT);

for (int i = 0; i < COUNT; i++) {
new Thread(() -> {
try {
lock.lock();
cnt++;
} finally {
lock.unLock();
latch.countDown();
}
}).start();
}

latch.await();
System.out.println("cnt----------->>> " + cnt);
}
}

下面附运行结果截图:

result

PS: 这里为了截图全面,因此只开了10个线程。经过劲越测试,开100个线程,1000个线程也不会存在线程安全问题。

小结

好了,前面我们通过多图详细说明了CLH锁的原理与实现,那么我们再对前面的知识进行一次小结:

  1. 首先我们学习了自旋锁和互斥锁的概念与区别;
  2. 然后我们学习了什么是CLH锁以及为什么要学习CLH锁;
  3. 最后我们通过图示+代码实现的方式来学习CLH锁的原理,从而为学习后面的AQS打好坚实的基础。

FutureTask<V> implements RunnableFuture<V>
RunnableFuture<V> extends Runnable, Future<V>

所以 FutureTask 既是个 Runnable,也是个 Future。因此 FutureTask 可以由 Executor#execute 方法执行,也可以由 ExecutorService#submit 方法提交执行。

其实在后面线程池源码中可以看到,通过 submit 方法向线程池提交的任务,默认情况下,不管入参是什么类型,最终都是转为最终都是转为 FutureTask。
(准确来说,只要是 RunnableFuture 的实现类就可以,但是需要重写 AbstractExecutorService#newTaskFor 方法)

FutureTask 是可取消的异步计算,可用于包装 Callable 或 Runnable 对象。

除了作为一个独立的类外,FutureTask 还提供了一些 protected 的功能,这在需要创建自定义任务类的场景下会很有用。

不同实现

FutureTask 类一开始有一段 Revision notes 和 Style note 的注释,这是 Doug Lea 大神重构该类做的一些说明。

因为老版本的实现和当前版本并不一样,老版本是基于 AQS 的方式实现的,新版本是基于 CAS + Treiber stack 来实现的。

jdk6jdk7 是老版本的实现,jdk7u 开始为当前版本的实现。

state

FutureTask 共有 7 种状态,这 7 种状态我自己又将它划分为三类:初始值 / 瞬态值 / 终态值。

state

其可能的状态转换共有 4 种:

  1. NEW -> COMPLETING -> NORMAL(set(V))
  2. NEW -> COMPLETING -> EXCEPTIONAL(set(Throwable))
  3. NEW -> CANCELLED(cancel())
  4. NEW -> INTERRUPTING -> INTERRUPTED(cancel())

state-transitions

fields

fields

Constructor

FutureTask 有两个构造方法

  1. 一个参数:Callable

constructor-1

  1. 两个参数:Runnable + 成功后的返回结果

constructor-2

FutureTask 的构造方法主要干的两件事:给 callable 赋值;将初始状态置为 NEW。

fields 可以看出,FutureTask 内部有个 Callable,但是没有 Runnable,两个参数的构造方法传的参数为 Runnable,那底层势必是要将 Runnable 转为 Callable,而 Executors.callable 方法就是干这个事的。

怎么做的呢?其实就是通过适配器模式,底层最终返回了 Executors.RunnableAdapter 对象。

Executors.callable

从这里可以看到,FutureTask 最大的作用就是统一了 Runnable 和 Callable,更方便使用。

WaitNode

前文说,新版是基于 CAS + Treiber stack 实现的,WaitNode 类就是 Treiber stack 中的元素。

该类的作用是,封装等待线程(调用 get 方法检索结果时在 awaitDone 方法中阻塞),使其排队,其形式是个单向链表。

WaitNode

cancel

cancel 方法是 Future 接口中的方法,尝试取消正在执行的任务。

cancel 方法执行逻辑:

  1. 当 state 不为 NEW 时,直接返回 false,表示无法取消该任务;否则,通过 CAS 算法将 state 由 NEW 变为 INTERRUPTING(瞬态值,当 mayInterruptIfRunning 为 true) 或 CANCELLED(终态值,当 mayInterruptIfRunning 为 false),如果这一步失败,说明在执行该操作时其他线程改变了 state,直接返回 false,表示无法取消该任务
  2. 如果 mayInterruptIfRunning 为 true,则调用 interrupt() 方法打断运行当前任务的线程,打断后,将 state 设置为 INTERRUPTED(终态值)
  3. 以上都执行完成后,删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果。不过此时去拿结果,必然会抛出 CancellationException 或 ExecutionException(report 方法)

cancel

finishCompletion 方法中会调用一个扩展方法 done(),默认实现为空。

finishCompletion

run

run 方法是 RunnableFuture 接口中的方法。

run 方法执行逻辑:

  1. 当 state 不为 NEW 时,直接返回;否则,通过 CAS 算法将 runner 由 null 赋值为 currentThread,如果这一步失败,说明在执行该操作时其他线程已经设置了 runner,直接返回
  2. 调用执行 Callable#call(),获取结果。如果正常执行结束,将 result 赋值给 outcome;如果执行过程中抛出异常,将 Throwable 赋值给 outcome
  3. 执行完成后,将 runner 置为 null。还有一个不容易想到的小细节,就是如果此时有别的线程调用了 cancel(true) 方法,当前线程正处于 INTERRUPTING 状态时,我们应该等待该状态由 INTERRUPTING 变为 INTERRUPTED,到达一个终态值

run

handlePossibleCancellationInterrupt

runAndReset

runAndReset 和 run 方法的实现逻辑差不多,唯一的区别就是,该方法不设置结果,并在执行成功后将状态重置为 NEW。

这是专为本质上执行多次的任务而设计的。ScheduledThreadPoolExecutor 中就用到了该方法。

runAndReset

set

将正常执行结束的结果 result 赋值给 outcome。

set 方法执行逻辑:

  1. 将 state 由 NEW 变为 COMPLETING(瞬态值)
  2. 给 outcome 赋值
  3. 将 state 由 COMPLETING 变为 NORMAL(终态值,和 setException 唯一不同的地方)
  4. 删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果

set

setException

将执行过程中抛出的异常 Throwable 赋值给 outcome。

setException 方法执行逻辑:

  1. 将 state 由 NEW 变为 COMPLETING(瞬态值)
  2. 给 outcome 赋值
  3. 将 state 由 COMPLETING 变为 EXCEPTIONAL(终态值,和 set 唯一不同的地方)
  4. 删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果

setException

get

无参

无参的 get 方法是 Future 接口中的方法,阻塞等待直到计算完成,然后检索其结果。

无参 get 方法执行逻辑:

  1. 如果当前 state 是 NEW 或 COMPLETING 时,说明任务在执行中或在给 outcome 赋值中,则调用 awaitDone 方法阻塞等待
  2. 任务执行完成并给 outcome 赋值后,返回结果或抛出的异常

get-1

两个参数

两个参数的 get 方法是 Future 接口中的方法,最多阻塞等待给定的计算完成时间,然后检索其结果(如果计算完成)。

两个参数的 get 方法执行逻辑:

  1. 如果当前 state 是 NEW 或 COMPLETING 时,说明任务在执行中或在给 outcome 赋值中,则调用 awaitDone 方法阻塞等待给定时间
  2. 如果任务经过阻塞等待给定时间后状态仍为 NEW 或 COMPLETING,则抛出超时异常 TimeoutException;否则,返回结果或抛出的异常

get-2

report

根据状态返回结果或抛出异常。

report

awaitDone

该方法支持一直阻塞等待(timed 为 false)或阻塞等待给定时间(timed 为 true)。

该方法里填补前文 Treiber stack 中埋的坑:

  1. 入栈过程在 awaitDone 方法中
  2. 出栈过程在 removeWaiter 方法中

awaitDone

removeWaiter

isCancelled

isCancelled 方法是 Future 接口中的方法,任务是否被取消。

isCancelled

isDone

isDone 方法是 Future 接口中的方法,任务是否完成。

isDone

protected methods

done

扩展方法,finishCompletion 方法中调用,默认实现为空,子类可以重写此方法来做回调或记录。

done

other protected methods

set / setException / runAndReset 前文均已介绍。


本文重点分析 FutureTask 相关的所有接口和类,为后面 FutureTask 源码解析做个准备。

本文涉及到的接口:Callable / Runnable / Future / RunnableFuture
本文涉及到的类:Executors.RunnableAdapter

Runnable

Runnable 接口定义了一个无参无返回值的 run 方法。如果某个类的实例计划由一个线程来执行,则可以去实现 Runnable 接口。

Runnable 接口的设计,是为了给希望在活动状态执行代码逻辑的对象提供通用协议。例如,Thread 类实现了 Runnable。处于活动状态仅意味着线程已启动且尚未停止。

Runnable 接口提供了使类处于活动状态而不用作为 Thread 子类的方法。通过实例化 Thread 实例并将自身作为 target 传入,实现 Runnable 的类可以在不继承 Thread 的情况下运行(see Thread 源码解析-实现 Runnable 接口)。

大多数情况下,如果只需要重写 run() 方法而不是其他 Thread 方法,就应该使用 Runnable 接口。

Runnable

Callable

Callable 接口定义了一个无参有返回值的 call 方法。

Callable 接口类似于 Runnable 接口,因为两者都是为实例可以由另一个线程执行的类而设计。两者的区别是:Callable 可以返回任务的计算结果或任务运行时抛出的异常。

Callable

Future

Future 表示异步计算的结果。提供了 5 个管理任务的方法。只能在计算完成后通过 get 方法检索结果,该方法必要时会阻塞直到任务完成。

如果想使用 Future 但不提供可用结果,可以声明为 Future<?> 并返回 null 作为底层任务结果。

看下 Future 提供的 5 个方法:

cancel(boolean mayInterruptIfRunning);

mayInterruptIfRunning 参数决定在取消任务时是否应该中断执行此任务的线程来尝试停止该任务

cancel

isCancelled();

任务是否被取消。

isCancelled

isDone();

任务是否完成。

isDone

get()

等待计算完成,然后检索其结果。

get-1

get(long timeout, TimeUnit unit)

等待给定的计算完成时间,然后检索其结果(如果计算完成)。

get-2

RunnableFuture

RunnableFuture<V> extends Runnable, Future<V>

RunnableFuture 接口可以让 Future 对 Runnable 进行管理。

RunnableFuture

Executors.RunnableAdapter

RunnableAdapter<T> implements Callable<T>

典型的适配器模式,将 Runnable 适配为 Callable:RunnableAdapter 实现 Callable 接口,接着在 Callable 的 call 方法里面调用 Runnable 的 run 方法。

Executors.RunnableAdapter

boring question

Q1

为什么 RunnableFuture extends Runnable,但是 RunnableFuture 里还定义了一个 run() 方法?

真有人讨论这个问题:
https://stackoverflow.com/questions/25092787/why-does-java-util-concurrent-runnablefuture-have-a-run-method

大意是,从理论上来讲,确实没有什么必要(no technical significance),但是这么做,可以便于文档展示(specific JavaDoc to it)。

Q2

为什么 Runnable 里的 run() 方法是 public abstract 的?

也真有人讨论这个问题:
https://stackoverflow.com/questions/3289767/why-run-method-defined-with-abstract-keyword-in-runnable-interface

大意是,其实这种接口声明风格是不好的,这么做,也许是历史遗留,或者是作者的即兴创作。

其实这两个问题并没有标准答案,只是我看源码的时候突然想到的,就像生活,有趣且无聊。。


Introduction

Treiber stack 是线程安全的,基于 CAS 的无锁并发栈。由 R. Kent Treiber 于 1986 年首次在文章 “Systems Programming: Coping with Parallelism” 中发表。

基本原理:

  1. 入栈时,通过 CAS 算法,将当前栈顶元素(old head)放在新元素之后以创建 new head 来完成

  2. 出栈时,在返回元素之前,必须检查自该操作开始以来,是否有另一个线程往栈中添加了新元素

Java example

用一个例子就能很容易看明白。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* ConcurrentStack
*
* Nonblocking stack using Treiber's algorithm
*
* @author Brian Goetz and Tim Peierls
*/
@ThreadSafe
public class ConcurrentStack <E> {
AtomicReference<Node<E>> top = new AtomicReference<>();

public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}

public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}

private static class Node<E> {
public final E item;
public Node<E> next;

public Node(E item) {
this.item = item;
}
}
}

real case

Hystrix

在限流框架 Hystrix 中就使用到了 Treiber stack。其主要实现和上面的 demo 差不多。

源码位置:com.netflix.hystrix.Hystrix.ConcurrentStack

Hystrix-ConcurrentStack

FutureTask

FutureTask 用到了 Treiber stack,但是是将其作为了简单链表来使用,使调用 get() 方法来检索结果的等待线程排队,并不是像上面的那样实现。

先挖个坑,后面 FutureTask 的文章会分析其入栈和出栈过程。

源码位置:java.util.concurrent.FutureTask.WaitNode

FutureTask-WaitNode


守护线程

每个线程可以被标记为守护线程,当创建新线程的线程是守护线程时,新线程也是守护线程(初始化时,see init),默认为非守护线程。与之对应的是用户线程。

daemon

守护线程的优先级很低,当 JVM 退出时,不会关心是否还存在守护线程。即使还存在守护线程,JVM 仍会退出。而用户线程如果还在运行,会阻止 JVM 进程退出。

当 JVM 启动时,通常会有一个非守护线程(通常是类的 main 方法)。当发生以下任一情况时 JVM 才会停止执行:

  1. Runtime 类的 exit 方法被调用(即终止当前运行的 Java 虚拟机),且安全管理器已允许退出操作的发生
  2. 所有不是守护线程的线程都已经死亡(调用 run 方法后返回 或 抛出异常)

有一个注意的点是,如果要调用 setDaemon 设置守护线程,必须在线程启动之前调用此方法,否则会抛出 IllegalThreadStateException。

setDaemon

场景

  1. 垃圾回收线程

  2. 一些监控工具
    当 JVM 需要退出时无需关注监控是否正在运行,直接退出不会对业务产生任何影响。

优先级

每个线程都有一个优先级,优先级高的线程优先于优先级低的线程执行。当创建一个新的 Thread 对象时,新线程的优先级最初设置为与创建线程的优先级相等(初始化时,see init)。

优先级最低 1,最高 10,默认 5。

priority

如果要让新线程的优先级与创建线程的优先级相等,则创建线程的 setPriority 应该在 new Thread() 之前调用;

如果只想改变新线程的优先级,则新线程的 setPriority 应该在 start() 之前调用。

setPriority

默认名称

默认名称为(构造方法中,see Constructor):”Thread-“ + 一个整数

该整数计算逻辑为:

nextThreadNum

所以第一个新线程默认名称为:Thread-0。

线程状态

NEW

尚未启动的线程处于 NEW 状态。

RUNNABLE

在 JVM 中执行的线程处于 RUNNABLE 状态。但它可能正在等待来自操作系统的其他资源。

BLOCKED

被阻塞等待监视器锁(monitor lock)的线程处于 BLOCKED 状态。处于阻塞状态的线程正在等待监视器锁进入同步 阻塞/方法 或在调用 Object#wait() 后重入同步 阻塞/方法。

WAITING

无限期等待另一个线程执行特定操作的线程处于 WAITING 状态。

调用以下方法之一:

  1. Object#wait()
  2. Thread#join()
  3. LockSupport#park()

例如,一个线程在一个对象上调用了 Object.wait() 方法,正在等待另一个线程在那个对象上调用 Object.notify() 方法或 Object.notifyAll() 方法;调用 Thread.join() 的线程正在等待指定的线程终止。

TIMED_WAITING

等待另一个线程执行操作直到指定等待时间的线程处于 TIMED_WAITING 状态。

调用以下方法之一:

  1. Thread#sleep
  2. Object#wait(long)
  3. Thread#join(long)
  4. LockSupport#parkNanos
  5. LockSupport#parkUntil

TERMINATED

已退出的线程处于 TERMINATED 状态。线程已完成执行。

状态流转图

status

两种方式创建新线程

继承 Thread 并重写 run 方法

demo:计算大于规定值的素数(质数)的线程

1
2
3
4
5
6
7
8
9
class PrimeThread extends Thread {
long minPrime;
PrimeThread(long minPrime) {
this.minPrime = minPrime;
}
public void run() {
// compute primes larger than minPrime
}
}

启动运行

1
2
PrimeThread p = new PrimeThread(143);
p.start();

实现 Runnable 接口并实现 run 方法

demo(同上):计算大于规定值的素数(质数)的线程

1
2
3
4
5
6
7
8
9
class PrimeRun implements Runnable {
long minPrime;
PrimeRun(long minPrime) {
this.minPrime = minPrime;
}
public void run() {
// compute primes larger than minPrime
}
}

启动运行

1
2
PrimeRun p = new PrimeRun(143);
new Thread(p).start();

Other Source Code

Constructor

一共 9 个构造函数,其中有一个不是 public 的,也就是说,用户可用的构造函数一共有 8 个。

constructor

一个一个看:

constructor-1

constructor-2

该构造函数不是 public 的。

constructor-3

constructor-4

constructor-5

constructor-6

constructor-7

constructor-8

constructor-9

init

Constructor 可以看到,所有的构造函数都是调用了 init 方法。一共有两个 init 方法,其中一个是另一个的重载。

init-1

init-2

终于到了完整的 init 逻辑,其中有一些 Java Security 的代码可以不用过多关注。

init-3

start

中间会调用一个 native method start0()。

start

可以看出:如果多次调用 start 方法,会抛出 IllegalThreadStateException。

run

run 方法比较简单,其中的 target 是在 init 方法中赋值的 Runnable(如果有的话),如果 target 为 null 则什么也不做(这也是继承 Thread 后需要重写 run 方法的原因)。

run

sleep

sleep 有 2 个方法

一个参数的方法

是 native method。

sleep-1

两个参数的方法

根据注释是为了实现纳秒级别的睡眠。

sleep-2

但有意思的是,两个参数的方法在实现上和注释有点不太相符。。

注释中说,休眠时间 = 毫秒数 + 纳秒数,但其实真实情况是:

  1. 当 nanos 小于 0.5ms时,millis 不变;
  2. 当 nanos 大于等于 0.5ms时,millis 加 1;
  3. 当 nanos 不为 0 且 millis 为 0 时,millis 加 1
    也就是说该方法并没有实现 ns 级别的睡眠,最小的睡眠时间为 1ms。

主要代码在这里

sleep-3

所以这个方法存在的意义是什么呢?想不明白,从注释上也没看出来。

发现 stackoverflow 上也有一些讨论,具体的意义大家还是自行体会吧:
https://stackoverflow.com/questions/6553225/whats-the-purpose-of-sleeplong-millis-int-nanos

join

join 有 3 个方法

无参数的方法

是为了永远等待。

join-1

两个参数的方法

根据注释是为了实现纳秒级别的等待。

同两个参数的 sleep 方法,该方法也并没有实现 ns 级别的等待,最小的等待时间为 1ms。

join-2

一个参数的方法

是真正的实现逻辑。

实现方式是 while 循环。参数为 0 意味着永远等待。isAlive() / wait(Long) 都是 native method。

join-3

yield

是一个 native method,表示当前线程愿意放弃对 CPU 的使用,可以防止过度使用 CPU。

但是调度程序可以随意忽略此提示,另外重新竞争时,CPU 也有可能再次选中自己。

yield

中断线程

常见 API 有三个

interrupt

中间会调用一个 native method interrupt0()。

当调用 Object#wait / Thread#join / Thread#sleep 方法时,如果调用此方法,会抛出 InterruptedException。

要注意的是,当抛出 InterruptedException 时,线程的中断状态会被清除。

interrupt

isInterrupted

isInterrupted 有 2 个方法

无参数的方法

是测试此线程是否已被中断。

isInterrupted-1

一个参数的方法

是 native method,可以根据传入的参数决定是否重置中断状态。该方法也是 interrupted 的底层实现。

isInterrupted-2

interrupted

底层实现依赖上面 isInterrupted 一个参数的方法。

interrupted

ThreadLocal & InheritableThreadLocal 变量

这两个变量也必须强行漏个脸。可以看到每个线程维护自己的变量。

ThreadLocal & InheritableThreadLocal

Deprecated Methods

一共 6 个 deprecated methods

  1. Thread#stop() / Thread#stop(Throwable) / Thread#destroy() / Thread#suspend() / Thread#resume()

deprecated 原因:
https://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html。

  1. 还有一个 native method: Thread#countStackFrames()

该方法作用主要是:计算线程中的堆栈帧数,而此时线程必须被挂起,否则会抛出 IllegalThreadStateException。

deprecated 原因:此调用的定义取决于 deprecated method Thread#suspend()。此外,此调用的结果从未明确定义。