您现在的位置是:首页 > 文章详情

并发编程-FutureTask解析 | 京东物流技术团队

日期:2023-07-27点击:240

1、FutureTask对象介绍

Future对象大家都不陌生,是JDK1.5提供的接口,是用来以阻塞的方式获取线程异步执行完的结果。

在Java中想要通过线程执行一个任务,离不开Runnable与Callable这两个接口。

Runnable与Callable的区别在于,Runnable接口只有一个run方法,该方法用来执行逻辑,但是并没有返回值;而Callable的call方法,同样用来执行业务逻辑,但是是有一个返回值的。

Callable执行任务过程中可以通过FutureTask获得任务的执行状态,并且可以在执行完成后通过Future.get()方式获取执行结果。

Future是一个接口,而FutureTask就是Future的实现类。并且FutureTask实现了 RunnableFuture(Runnable + Future),说明我们可以创建一个FutureTask并直接把它放到线程池执行,然后获取FutureTask的执行结果。

2、FutureTask源码解析

2.1 主要方法和属性

那么FutureTask是如何通过阻塞的方式来获取到异步线程执行的结果的呢?我们看下FutureTask中的属性。

// FutureTask的状态及其常量 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // callable对象,执行完后置空 private Callable<V> callable; // 要返回的结果或要引发的异常来自 get() 方法 private Object outcome; // non-volatile, protected by state reads/writes // 执行Callable的线程 private volatile Thread runner; // 等待线程的一个链表结构 private volatile WaitNode waiters; 

FutureTask中几个比较重要的方法。

// 取消任务的执行 boolean cancel(boolean mayInterruptIfRunning); // 返回任务是否已经被取消 boolean isCancelled(); // 返回任务是否已经完成,任务状态不为NEW即为完成 boolean isDone(); // 通过get方法获取任务的执行结果 V get() throws InterruptedException, ExecutionException; // 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 

2.2 FutureTask执行

当我们在线程池中执行一个Callable方法时,其实是将Callable任务封装成一个RunnableFuture对象去执行,同时将这个RunnableFuture对象返回,这样我们就拿到了FutureTask的引用,可以随时获取到任务执行的状态,并且可以在任务执行完成后通过该对象获取执行结果。

以下为ThreadPoolExecutor线程池提交一个callable方法的源码。

public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } 

2.3 run方法介绍

RunnableFuture其实也是一个可以执行的runnable,我们看下他的run方法。其主要流程就是执行call方法,正常执行完毕后将result结果赋值到outcome属性上。

public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 将callable赋值到本地变量 Callable<V> c = callable; // 判断callable不为空并且FutureTask的状态必须为新创建 if (c != null && state == NEW) { V result; boolean ran; try { // 执行call方法(用户自己实现的call逻辑),并获取到result结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 如果执行过程出现异常,则将异常对象赋值到outcome上 setException(ex); } // 如果正常执行完毕,则将result赋值到outcome属性上 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 

以下逻辑为正常执行完成后赋值的逻辑。

// 如果任务没有被取消,将future执行完的返回值赋值给result结果 // FutureTask任务的执行状态是通过CAS的方式进行赋值的,并且由此可知,COMPLETING其实是一个瞬时状态 // 当将线程执行结果赋值给outcome后,状态会修改为对应的NORMAL,即正常结束 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } 

以下为执行异常时赋值逻辑,直接将Throwable对象赋值到outcome属性上。

protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } 

无论是正常执行还是异常执行,最终都会调用一个finishCompletion方法,用来做工作的收尾工作。

2.4 get方法介绍

Future的get方法有两个重载的方法,一个是get()获取结果,一个是get(long, TimeUnit)带有超时时间的获取结果,我们看下FutureTask中的这两个方法是如何实现的。

// 不带有超时时间,一直阻塞直到获取结果 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 等待结果完成,带有超时的get方法也是调用的awaitDone方法 s = awaitDone(false, 0L); // 返回结果 return report(s); } // 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; // 如果任务未中断,调用awaitDone方法等待任务结果 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); // 返回结果 return report(s); } 

我们主要看下awaitDone方法的执行逻辑。此方法会通过for循环的方式一直阻塞等待任务执行完成。如果带有超时时间,则超过截止时间后会直接返回。

// timed:是否需要超时获取 // nanos:超时时间单位纳秒 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 此方法会一直for循环判断任务状态是否已经完成,是Future.get阻塞的原因 for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 任务状态大于COMPLETING,则表明任务结束,直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet // Thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。 // COMPLETING状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为NORMAL或者EXCEPTIONAL Thread.yield(); else if (q == null) // 每调用一次get方法,都会创建一个WaitNode等待节点 q = new WaitNode(); else if (!queued) // 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是NEW else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } 

我们在看下report方法,在调用get方法时是如何返回结果的。

这里首先获取outcome的值,并判断任务是否已经执行完成,如果执行完成,则将outcome对象强转成泛型指定的类型;如果任务被取消了,则抛出一个CancellationException异常;如果都不是,则说明任务在执行过程中发生了异常,此时任务状态位EXCEPTIONAL,此时的outcome即为Throwable对象,所以将outcome强转为Throwable并抛出异常。

由此可以知道,我们将一个FutureTask任务submit到线程池中执行的时候,如果发生了异常,是会在调用get方法的时候抛出的。

private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } 

2.5 cancel方法介绍

cancel方法用于取消正在运行的任务,如果任务取消成功,则返回TRUE,如果取消失败则返回FALSE。

// mayInterruptIfRunning:允许中断正在运行的任务 public boolean cancel(boolean mayInterruptIfRunning) { // mayInterruptIfRunning如果为true则将状态置为INTERRUPTING,如果未false则将状态置为CANCELLED if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; // 如果状态修改成功后,判断是否允许中断线程,如果允许,则调用Thread的interrupt方法中断 try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 取消后的收尾工作 finishCompletion(); } return true; } 

2.6 isDone/isCancelled方法介绍

isDone方法用于判断FutureTask是否已经完成;isCancelled方法用来判断FutureTask是否已经取消,这两个方法都是通过状态位来判断的。

public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } 

2.7 finishCompletion方法介绍

我们看下finishCompletion方法都做了哪些工作。

// 删除所有等待线程并发出信号,最后执行done方法 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } 

我们看到done方法是一个受保护的空方法,此处没有任何逻辑,由其子类去根据自己的业务去实现相应的逻辑。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。

protected void done() { } 

3、总结

通过源码解读可以了解到Future的原理:

第一步:主线程将任务封装成一个Callable对象,通过submit方法提交到线程池去执行。

第二步:线程池执行任务的run方法,主线程则可以继续执行其他逻辑。

第三步:线程池中方法执行完成后将结果赋值到outcome属性上,并修改任务状态。

第四步:主线程在需要拿到异步任务结果的时候,主动调用fugure.get()方法来获取结果。

第五步:如果异步线程在执行过程中发生异常,则会在调用future.get()方法的时候抛出来。

以上就是对于FutureTask的分析,我们可以了解FutureTask任务执行的方式以及Future.get已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解FutureTask的任务执行状态以及状态的变化过程。

 

作者:京东物流 丁冬

来源:京东云开发者社区 自猿其说Tech

原文链接:https://my.oschina.net/u/4090830/blog/10091211
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章