FutureTask<V> implements RunnableFuture<V>
RunnableFuture<V> extends Runnable, Future<V>
所以 FutureTask 既是个 Runnable,也是个 Future。因此 FutureTask 可以由 Executor#execute 方法执行,也可以由 ExecutorService#submit 方法提交执行。
其实在后面线程池源码中可以看到,通过 submit 方法向线程池提交的任务,默认情况下,不管入参是什么类型,最终都是转为最终都是转为 FutureTask。
(准确来说,只要是 RunnableFuture 的实现类就可以,但是需要重写 AbstractExecutorService#newTaskFor 方法)
FutureTask 是可取消的异步计算,可用于包装 Callable 或 Runnable 对象。
除了作为一个独立的类外,FutureTask 还提供了一些 protected 的功能,这在需要创建自定义任务类的场景下会很有用。
不同实现
FutureTask 类一开始有一段 Revision notes 和 Style note 的注释,这是 Doug Lea 大神重构该类做的一些说明。
因为老版本的实现和当前版本并不一样,老版本是基于 AQS 的方式实现的,新版本是基于 CAS + Treiber stack 来实现的。
jdk6 和 jdk7 是老版本的实现,jdk7u 开始为当前版本的实现。
state
FutureTask 共有 7 种状态,这 7 种状态我自己又将它划分为三类:初始值 / 瞬态值 / 终态值。
其可能的状态转换共有 4 种:
- NEW -> COMPLETING -> NORMAL(set(V))
- NEW -> COMPLETING -> EXCEPTIONAL(set(Throwable))
- NEW -> CANCELLED(cancel())
- NEW -> INTERRUPTING -> INTERRUPTED(cancel())
fields
Constructor
FutureTask 有两个构造方法
- 一个参数:Callable
- 两个参数:Runnable + 成功后的返回结果
FutureTask 的构造方法主要干的两件事:给 callable 赋值;将初始状态置为 NEW。
从 fields 可以看出,FutureTask 内部有个 Callable,但是没有 Runnable,两个参数的构造方法传的参数为 Runnable,那底层势必是要将 Runnable 转为 Callable,而 Executors.callable 方法就是干这个事的。
怎么做的呢?其实就是通过适配器模式,底层最终返回了 Executors.RunnableAdapter 对象。
从这里可以看到,FutureTask 最大的作用就是统一了 Runnable 和 Callable,更方便使用。
WaitNode
前文说,新版是基于 CAS + Treiber stack 实现的,WaitNode 类就是 Treiber stack 中的元素。
该类的作用是,封装等待线程(调用 get 方法检索结果时在 awaitDone 方法中阻塞),使其排队,其形式是个单向链表。
cancel
cancel 方法是 Future 接口中的方法,尝试取消正在执行的任务。
cancel 方法执行逻辑:
- 当 state 不为 NEW 时,直接返回 false,表示无法取消该任务;否则,通过 CAS 算法将 state 由 NEW 变为 INTERRUPTING(瞬态值,当 mayInterruptIfRunning 为 true) 或 CANCELLED(终态值,当 mayInterruptIfRunning 为 false),如果这一步失败,说明在执行该操作时其他线程改变了 state,直接返回 false,表示无法取消该任务
- 如果 mayInterruptIfRunning 为 true,则调用 interrupt() 方法打断运行当前任务的线程,打断后,将 state 设置为 INTERRUPTED(终态值)
- 以上都执行完成后,删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果。不过此时去拿结果,必然会抛出 CancellationException 或 ExecutionException(report 方法)
finishCompletion 方法中会调用一个扩展方法 done(),默认实现为空。
run
run 方法是 RunnableFuture 接口中的方法。
run 方法执行逻辑:
- 当 state 不为 NEW 时,直接返回;否则,通过 CAS 算法将 runner 由 null 赋值为 currentThread,如果这一步失败,说明在执行该操作时其他线程已经设置了 runner,直接返回
- 调用执行 Callable#call(),获取结果。如果正常执行结束,将 result 赋值给 outcome;如果执行过程中抛出异常,将 Throwable 赋值给 outcome
- 执行完成后,将 runner 置为 null。还有一个不容易想到的小细节,就是如果此时有别的线程调用了 cancel(true) 方法,当前线程正处于 INTERRUPTING 状态时,我们应该等待该状态由 INTERRUPTING 变为 INTERRUPTED,到达一个终态值
runAndReset
runAndReset 和 run 方法的实现逻辑差不多,唯一的区别就是,该方法不设置结果,并在执行成功后将状态重置为 NEW。
这是专为本质上执行多次的任务而设计的。ScheduledThreadPoolExecutor 中就用到了该方法。
set
将正常执行结束的结果 result 赋值给 outcome。
set 方法执行逻辑:
- 将 state 由 NEW 变为 COMPLETING(瞬态值)
- 给 outcome 赋值
- 将 state 由 COMPLETING 变为 NORMAL(终态值,和 setException 唯一不同的地方)
- 删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果
setException
将执行过程中抛出的异常 Throwable 赋值给 outcome。
setException 方法执行逻辑:
- 将 state 由 NEW 变为 COMPLETING(瞬态值)
- 给 outcome 赋值
- 将 state 由 COMPLETING 变为 EXCEPTIONAL(终态值,和 set 唯一不同的地方)
- 删除并唤醒所有在 Treiber stack 中排队的等待线程去拿结果
get
无参
无参的 get 方法是 Future 接口中的方法,阻塞等待直到计算完成,然后检索其结果。
无参 get 方法执行逻辑:
- 如果当前 state 是 NEW 或 COMPLETING 时,说明任务在执行中或在给 outcome 赋值中,则调用 awaitDone 方法阻塞等待
- 任务执行完成并给 outcome 赋值后,返回结果或抛出的异常
两个参数
两个参数的 get 方法是 Future 接口中的方法,最多阻塞等待给定的计算完成时间,然后检索其结果(如果计算完成)。
两个参数的 get 方法执行逻辑:
- 如果当前 state 是 NEW 或 COMPLETING 时,说明任务在执行中或在给 outcome 赋值中,则调用 awaitDone 方法阻塞等待给定时间
- 如果任务经过阻塞等待给定时间后状态仍为 NEW 或 COMPLETING,则抛出超时异常 TimeoutException;否则,返回结果或抛出的异常
report
根据状态返回结果或抛出异常。
awaitDone
该方法支持一直阻塞等待(timed 为 false)或阻塞等待给定时间(timed 为 true)。
该方法里填补前文 Treiber stack 中埋的坑:
- 入栈过程在 awaitDone 方法中
- 出栈过程在 removeWaiter 方法中
isCancelled
isCancelled 方法是 Future 接口中的方法,任务是否被取消。
isDone
isDone 方法是 Future 接口中的方法,任务是否完成。
protected methods
done
扩展方法,finishCompletion 方法中调用,默认实现为空,子类可以重写此方法来做回调或记录。
other protected methods
set / setException / runAndReset 前文均已介绍。