0%

FutureTask 源码解析

FutureTask<V> implements RunnableFuture<V>
RunnableFuture<V> extends Runnable, Future<V>

所以 FutureTask 既是个 Runnable,也是个 Future。因此 FutureTask 可以由 Executor#execute 方法执行,也可以由 ExecutorService#submit 方法提交执行。

其实在后面线程池源码中可以看到,通过 submit 方法向线程池提交的任务,默认情况下,不管入参是什么类型,最终都是转为最终都是转为 FutureTask。
(准确来说,只要是 RunnableFuture 的实现类就可以,但是需要重写 AbstractExecutorService#newTaskFor 方法)

FutureTask 是可取消的异步计算,可用于包装 Callable 或 Runnable 对象。

除了作为一个独立的类外,FutureTask 还提供了一些 protected 的功能,这在需要创建自定义任务类的场景下会很有用。

不同实现

FutureTask 类一开始有一段 Revision notes 和 Style note 的注释,这是 Doug Lea 大神重构该类做的一些说明。

因为老版本的实现和当前版本并不一样,老版本是基于 AQS 的方式实现的,新版本是基于 CAS + Treiber stack 来实现的。

jdk6jdk7 是老版本的实现,jdk7u 开始为当前版本的实现。

state

FutureTask 共有 7 种状态,这 7 种状态我自己又将它划分为三类:初始值 / 瞬态值 / 终态值。

state

其可能的状态转换共有 4 种:

  1. NEW -> COMPLETING -> NORMAL(set(V))
  2. NEW -> COMPLETING -> EXCEPTIONAL(set(Throwable))
  3. NEW -> CANCELLED(cancel())
  4. NEW -> INTERRUPTING -> INTERRUPTED(cancel())

state-transitions

fields

fields

Constructor

FutureTask 有两个构造方法

  1. 一个参数:Callable

constructor-1

  1. 两个参数:Runnable + 成功后的返回结果

constructor-2

FutureTask 的构造方法主要干的两件事:给 callable 赋值;将初始状态置为 NEW。

fields 可以看出,FutureTask 内部有个 Callable,但是没有 Runnable,两个参数的构造方法传的参数为 Runnable,那底层势必是要将 Runnable 转为 Callable,而 Executors.callable 方法就是干这个事的。

怎么做的呢?其实就是通过适配器模式,底层最终返回了 Executors.RunnableAdapter 对象。

Executors.callable

从这里可以看到,FutureTask 最大的作用就是统一了 Runnable 和 Callable,更方便使用。

WaitNode

前文说,新版是基于 CAS + Treiber stack 实现的,WaitNode 类就是 Treiber stack 中的元素。

该类的作用是,封装等待线程(调用 get 方法检索结果时在 awaitDone 方法中阻塞),使其排队,其形式是个单向链表。

WaitNode

cancel

cancel 方法是 Future 接口中的方法,尝试取消正在执行的任务。

cancel 方法执行逻辑:

  1. 当 state 不为 NEW 时,直接返回 false,表示无法取消该任务;否则,通过 CAS 算法将 state 由 NEW 变为 INTERRUPTING(瞬态值,当 mayInterruptIfRunning 为 true) 或 CANCELLED(终态值,当 mayInterruptIfRunning 为 false),如果这一步失败,说明在执行该操作时其他线程改变了 state,直接返回 false,表示无法取消该任务
  2. 如果 mayInterruptIfRunning 为 true,则调用 interrupt() 方法打断运行当前任务的线程,打断后,将 state 设置为 INTERRUPTED(终态值)
  3. 以上都执行完成后,删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果。不过此时去拿结果,必然会抛出 CancellationException 或 ExecutionException(report 方法)

cancel

finishCompletion 方法中会调用一个扩展方法 done(),默认实现为空。

finishCompletion

run

run 方法是 RunnableFuture 接口中的方法。

run 方法执行逻辑:

  1. 当 state 不为 NEW 时,直接返回;否则,通过 CAS 算法将 runner 由 null 赋值为 currentThread,如果这一步失败,说明在执行该操作时其他线程已经设置了 runner,直接返回
  2. 调用执行 Callable#call(),获取结果。如果正常执行结束,将 result 赋值给 outcome;如果执行过程中抛出异常,将 Throwable 赋值给 outcome
  3. 执行完成后,将 runner 置为 null。还有一个不容易想到的小细节,就是如果此时有别的线程调用了 cancel(true) 方法,当前线程正处于 INTERRUPTING 状态时,我们应该等待该状态由 INTERRUPTING 变为 INTERRUPTED,到达一个终态值

run

handlePossibleCancellationInterrupt

runAndReset

runAndReset 和 run 方法的实现逻辑差不多,唯一的区别就是,该方法不设置结果,并在执行成功后将状态重置为 NEW。

这是专为本质上执行多次的任务而设计的。ScheduledThreadPoolExecutor 中就用到了该方法。

runAndReset

set

将正常执行结束的结果 result 赋值给 outcome。

set 方法执行逻辑:

  1. 将 state 由 NEW 变为 COMPLETING(瞬态值)
  2. 给 outcome 赋值
  3. 将 state 由 COMPLETING 变为 NORMAL(终态值,和 setException 唯一不同的地方)
  4. 删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果

set

setException

将执行过程中抛出的异常 Throwable 赋值给 outcome。

setException 方法执行逻辑:

  1. 将 state 由 NEW 变为 COMPLETING(瞬态值)
  2. 给 outcome 赋值
  3. 将 state 由 COMPLETING 变为 EXCEPTIONAL(终态值,和 set 唯一不同的地方)
  4. 删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果

setException

get

无参

无参的 get 方法是 Future 接口中的方法,阻塞等待直到计算完成,然后检索其结果。

无参 get 方法执行逻辑:

  1. 如果当前 state 是 NEW 或 COMPLETING 时,说明任务在执行中或在给 outcome 赋值中,则调用 awaitDone 方法阻塞等待
  2. 任务执行完成并给 outcome 赋值后,返回结果或抛出的异常

get-1

两个参数

两个参数的 get 方法是 Future 接口中的方法,最多阻塞等待给定的计算完成时间,然后检索其结果(如果计算完成)。

两个参数的 get 方法执行逻辑:

  1. 如果当前 state 是 NEW 或 COMPLETING 时,说明任务在执行中或在给 outcome 赋值中,则调用 awaitDone 方法阻塞等待给定时间
  2. 如果任务经过阻塞等待给定时间后状态仍为 NEW 或 COMPLETING,则抛出超时异常 TimeoutException;否则,返回结果或抛出的异常

get-2

report

根据状态返回结果或抛出异常。

report

awaitDone

该方法支持一直阻塞等待(timed 为 false)或阻塞等待给定时间(timed 为 true)。

该方法里填补前文 Treiber stack 中埋的坑:

  1. 入栈过程在 awaitDone 方法中
  2. 出栈过程在 removeWaiter 方法中

awaitDone

removeWaiter

isCancelled

isCancelled 方法是 Future 接口中的方法,任务是否被取消。

isCancelled

isDone

isDone 方法是 Future 接口中的方法,任务是否完成。

isDone

protected methods

done

扩展方法,finishCompletion 方法中调用,默认实现为空,子类可以重写此方法来做回调或记录。

done

other protected methods

set / setException / runAndReset 前文均已介绍。


欢迎关注我的其它发布渠道