0%

【Java】漫谈JUC中的 FurtureTask

人生若只如初见,何事秋风悲画扇。

这是本应该是一个很美妙的事情,但是所有的每秒都是在一瞬间。

Furture 的说明

关于 FutureTask 之前有讲过,是在《【多线程】拿到Java多线程里面的值》里面。FutureTask可以帮助我们在 N 个线程中去触发执行或者取消逻辑。这里就会牵扯到线程安全问题。在 FutureTask 里面是怎么把握这个线程安全的呢?进去看一下。

继承关系

我们先看一下这个继承关系:
FutureTask.png

FurtureTask 的运行中的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

这是当前任务的执行过程中的状态。可能的状态转化为:

1
2
3
4
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED

也就是说,初始的状态是 NEWCOMPLETING , INTERRUPTING 是里面的中间过渡状态,其他的都是 终止状态。

状态的改变

看这张图:
change-state

状态的改变只有三个: cancel , set , setException 。 在注释中也提到了,在我们的这个执行的过程中,我们的这个状态可以被 set , 从而中断了原来的执行。

我们看一下这几个执行方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
1
2
3
4
5
6
7
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
1
2
3
4
5
6
7
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

FutureTask 里面,怎么去根据我们的这个状态码去做到后续操作的。比如说,我们的 get() 方法里面:

1
2
3
4
5
6
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

我们先拿到我们的状态码,如果说,还处于 / 还没到 COMPLETING , 就先等待 完成,如果状态码被改变了:

1
2
3
4
5
6
7
8
9
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

这个判断的过程还是很简单也很好理解的。那我们再看这个 awaitDone() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

当用户代码向线程池提交任务后,一个最常见的后续操作就是阻塞(或带超时的阻塞)等待任务结果。

如果说,我们的任务没有完成,那这里就要把 cas 加入到 队列里面去,用到了parkNanos/park 来阻塞我们的线程,直至唤醒。这个唤醒的过程有三个:

  • 线程被中断了;
  • 超时阻塞
  • 完成任务

FurtureTask 的任务执行

任务执行,主要的就是 run 方法了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

大家可以看到,我们在运行结束后,他这里是有一个 把结果进行 set 的操作的。

到此,这个 FurtureTask 算是完了。基本原理也就是这些。目前正好再用,就在一边学习,一边实操。

这是打赏的地方...
---------Thanks for your attention---------