Future 表示一个异步计算,提供了用于检查异步计算是否完成,等待异步计算结果,取消异步计算任务等方法。等待获取异步计算结果会阻塞当前线程,直到超时或者获取到异步计算的结果。如果异步计算已经完成,取消计算任务的方法将不会生效。如果希望利用 Future 来处理异步但是没有计算结果的任务,可以使用 Future<?>,然后返回 null 实现。
Future 的使用例子如下:
interface ArchiveSearcher { String search(String target); } class App { ExecutorService executor = ... ArchiveSearcher searcher = ... void showSearch(final String target) throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call() { return searcher.search(target); }}); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; }
- 自定义 FutureTask,利用线程类执行异步任务,利用 FutureTask 获取异步任务结果
public class FutureTaskTest { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); Callable<String> callable = () -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return "任务执行结果"; FutureTask<String> futureTask = new FutureTask<>(callable); Thread thread = new Thread(futureTask); thread.start(); System.out.println(futureTask.get());
Future 的方法列表:
* 取消异步任务。 * 该方法尝试取消异步任务,但是不一定能够取消成功。如果任务已经完成,获取已经被取消过,或者由于其他原因取消了,那么都会取消失败。 * 如果异步任务在取消的时候还未开始,那么异步任务将不会开始运行; * 如果任务已经开始运行,那么 mayInterruptIfRunning 参数将决定是否通过中断线程来取消异步任务。 * 如果取消成功,后续调用 isDone 和 isCancelled 都将会返回 true。 * @param mayInterruptIfRunning 参数决定运行中的任务是否通过中断线程来取消任务。如果设置 false,那么运行中的任务无法取消。 * @return 如果取消成功则返回 true,否则返回 false。 boolean cancel(boolean mayInterruptIfRunning); * 如果异步任务取消成功,那么返回 true,否则返回 false * @return 如果任务取消成功返回 true,否则返回 false boolean isCancelled(); * 如果任务执行完毕(包括被取消成功)返回 true,否则返回 false。 * 任务正常结束,任务被取消成功,任务执行中发生异常而结束都是任务执行完毕。 * @return 任务执行完毕则返回 true,否则返回 false boolean isDone(); * 等待获取任务计算结果。该方法或中断当前线程,知道异步任务结束。 * @return 任务计算结果 * @throws CancellationException 任务如果取消则抛出该异常 * @throws ExecutionException 任务执行过程中发生异常 * @throws InterruptedException 执行异步任务的线程被中断 V get() throws InterruptedException, ExecutionException; * 可设置超时时间的等待获取异步任务计算结果。该方法会中断当前线程直到获取异步任务结果或者超时。 * @param timeout 超时时间 * @param unit 时间单位 * @return 异步任务结果 * @throws CancellationException 任务被取消 * @throws ExecutionException 异步任务执行过程中发生异常 * @throws InterruptedException 执行异步任务的线程被中断 * @throws TimeoutException 等待时间超过设置的超时时间 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;``` <a id="org58d0423"></a> # RunnableFuture Future 本身是一个异步计算结果,为了让 Future 成为一个任务,所以定义了一个 RunnableFuture。RunnableFuture 即继承 Runnable,有继承 Future,即表示是一个异步计算结果,也表示一个可执行的一个任务。 ```java * RunnableFuture 同时表现成一个异步计算结果和可通过线程执行的任务。 public interface RunnableFuture<V> extends Runnable, Future<V> { * 如果任务未取消,那么将任务的计算结果交给异步计算结果(Future) void run();
FutureTask
FutureTask 是 RunnableFuture 的具体实现,也是线程池框架执行异步任务的结算结果。
FutureTask 的类继承关系如下:
FutureTask 有 Callable callable 属性,用于存储外部任务。当初始化 FutureTask 时,需要指定一个具体的外部任务初始化 callable 属性。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW;
FutureTask 本身是一个任务,提供 run 方法的具体实现。run 方法会执行 callable.call()拿到任务结果,并将结果赋值给另一个属性 Object outcome。
当使用 get()方法获取异步任务结果时,其实就是等待任务 run 方法执行结束:在异步任务未结束前,生成等待节点,加入到等待队列,线程被挂起。如果异步任务执行结束后会给outcome赋值异步任务计算结果,并唤醒等待队列中的线程。
get()方法通过 FutureTask 的状态判断任务是否执行完成,如果是执行完成的状态,那么返回 outcome,如果线程被中断或者超时或者执行任务过程中抛出异常则 get 方法抛出对应的异常。
FutureTask 的状态:
任务执行成功的状态变化示意图:
* state 表示 FutureTask 的状态。可能的状态一空有 7 种,初始化时是 NEW。状态变化的路径为: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** 代表一个外部任务。任务执行完毕后会置为 null */ private Callable<V> callable; /** 存储异步任务执行结果 */ private Object outcome; /** 用于存储执行异步任务的线程的引用, */ private volatile Thread runner; /** 单向链表,用来存储调用 get 方法阻塞等待异步任务执行结果的线程 */ private volatile WaitNode waiters;``` <a id="org00f89d1"></a> ### 构造方法 FutureTask 初始化需要外部指定的参数是 callable 对象,需要在执行任务时执行 callable.call 获取异步任务结果。所以构造方法都是围绕初始化 callable 对象而进行 ```java * 根据指定的 callable 对象生成一个 FutureTask 对象。 * @param callable callable 对象 * @throws NullPointerException 如果 callable 对象是空对象,那么抛出 NullPointerException 异常 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // FutureTask 初始化状态是 NEW this.state = NEW; * 根据指定的 Runnable 对象生成 FutureTask。 * 借助 Executors public static <T> Callable<T> callable(Runnable task, T result)方法,利用 runnable 对象和泛型生成一个 callable 对象。 * @param runnable 可执行任务 * @param result callable 的执行结果的类型(执行 callable.call 的返回类型) * @throws NullPointerException 如果 runnable 是空对象,那么抛出空指针异常 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }``` <a id="org224cb8b"></a> ### Future 接口实现 ```java * 取消异步任务 * @param mayInterruptIfRunning 参数决定运行中的任务是否通过中断线程来取消任务。如果设置 false,那么运行中的任务无法取消。 * @return 如果取消成功则返回 true,否则返回 false。 public boolean cancel(boolean mayInterruptIfRunning) { // 如果 FutureTask 状态是 NEW,并且通过中断线程来取消任务,那么将状态改成 INTERRUPTING,即 NEW -> INTERRUPTING -> INTERRUPTED 的状态变化 // 如果 FutureTask 状态是 NEW,并且不通过中断线程来取消,那么将状态改成 CANCELLED,即 NEW -> CANCELLED 的状态变化流程 // 如果判断条件不成功,说明此事 FutureTask 已经不是 NEW 的状态,那么直接返回取消失败。 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果是通过中断线程来取消任务,那么将线程置为中断状态,并且将 FutureTask 的状态改成 INTERRUPTED if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } finally { // 唤醒所有等待获取结果的线程 finishCompletion(); return true; * FutureTask 的异步任务是否已经被取消 public boolean isCancelled() { // 如果 FutureTask 的状态是 CANCELLED,INTERRUPTING,INTERRUPTED 的状态,那么 FutureTask 就是被取消的。INTERRUPTING 或者 INTERRUPTED 的状态可以通过 FutureTask.cancel(true)产生。 // EXCEPTIONAL 是在执行异步任务过程中抛出的异常产生并非是取消任务产生。 return state >= CANCELLED; * 返回异步是否已经结束。非开始的状态都是结束的装填。 public boolean isDone() { // 在 callable.call 方法结束前,FutureTask 的状态都是 NEW 的状态,call 结束后 FutureTask 的状态才会改成其他状态,所以非 NEW 的状态都是结束状态。 return state != NEW; * 获取异步任务的计算结果 public V get() throws InterruptedException, ExecutionException { int s = state; // 如果 s <= COMPLETING 说明任务还没执行完成那么等待异步任务完成 if (s <= COMPLETING) s = awaitDone(false, 0L); // 如果正常获取到异步任务计算结果,那么返回异步计算任务结果;如果异步任务被取消,那么抛出 CancellationException;如果异步任务执行过程中发生异常,那么抛出 ExecutionException 异常 return report(s); * 可自定义超时时间的等待获取异步任务计算结果 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); * 唤醒所有等待获取异步任务结果的线程 private void finishCompletion() { // 如果等待队列中有节点,那么唤醒节点上的线程,并将节点提出等待队列 for (WaitNode q; (q = waiters) != null;) { // 利用 CAS 比较修改,将等待队列置为 null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { // 唤醒节点上的线程 Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); // 遍历下一个节点 WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; break; // 桩代码,FutureTask 没有任何功能,留给子类做完成任务后的定制化处理 done(); callable = null; // to reduce footprint * 等待异步任务结束 * @param timed 是否设置了超时时间 * @param nanos 超时时间 * @return state FutureTask 结束 awaitDone 时的的状态 private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 计算等待的毫秒数 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 等待获取异步任务的节点 WaitNode q = null; // 是否进入等待队列 boolean queued = false; for (;;) { // 此处应该是是线程阻塞(park 方法能够响应线程中断)时,线程被中断。如果线程被中断,那么从队列中删除对应的等待节点,抛出 InterruptedException if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); // FutureTask 当前状态 int s = state; // 如果不是 NEW 那么没有必要继续等待异步任务执行结果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; // 如果是 COMPLETING,FutureTask 的异步任务即将完成或者被中断结束,那么将 CPU 让渡出去继续等待最终完成。 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 需要线程阻塞等待,先构建等待节点 else if (q == null) q = new WaitNode(); // 如果等待节点未入队,那么将等待节点添加到队列队首, else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果设置超时时间,那么先判断是否已经超过了超时时间,如果超过则删除等待节点并退出;否则 LockSupport.parkNanos 阻塞线程。 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; LockSupport.parkNanos(this, nanos); LockSupport.park(this); * 从阻塞队列中删除指定的等待节点 private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; break;
Runnable 的实现
* 任务的实现细节 public void run() { // 如果 FutureTask 不是初始化状态或者 runner 已经指向了具体的线程,说明 FutureTask 已经被取消或者已经被执行(run 方法已经被调用过),那么直接退出该方法 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // c != null && state == NEW 说明任务未被执行并且未被取消 if (c != null && state == NEW) { V result; boolean ran; try { // 执行异步任务并获取任务结果 result = c.call(); ran = true; // 如果执行异步任务过程中抛出异常,那么捕获并将 FutureTask 的状态置为 COMPLETING,将异常赋值给 outcome,之后将状态置为 EXCEPTIONAL,即 NEW -> COMPLETING -> EXCEPTIONAL 的变化流程。 } catch (Throwable ex) { result = null; ran = false; setException(ex); // 如果正常执行结束,将 FutureTask 状态从 NEW 置为 COMPLETING,将结果赋值给 outcome,将状态置为 NORMAL,即 NEW -> COMPLETING -> NORMAL 状态变化流程。 if (ran) set(result); } finally { // 清空 runner runner = null; int s = state; // 再次确认是否有执行通过线程中断来取消操作,如果有那么等待取消完成。 // s >= INTERRUPTING 可能产生的情况是:在执行 set 操作前 FutureTask 被取消,导致状态从 NEW 编程变成 INTERRUPTING。那么 run 方法需要等待取消操作结束再结束。这是为了确保运行时的 cancel(true)的操作传递过来的取消操作都能在 run()方法结束前处理。 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); * 如果在执行异步任务过程中捕获到问题,那么将捕获到的异常的原因交给 outcome,并唤醒所有等待获取结果的线程。 * @param t 异常的原因 protected void setException(Throwable t) { // 如果 FutureTask 的状态是 NEW,那么改成 COMPLETING。 // 如果在处理异常前一刻异步任务被取消,那么会走取消流程,因为 CAS 修改只针对 NEW 的状态。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state // 唤醒等待获取异步任务结果的线程 finishCompletion(); * 将任务的执行结果赋值给 outcome,并修改 FutureTask 的状态 * @param v the value protected void set(V v) { // CAS 比较修改 FutureTask 的状态,从 NEW 改成 COMPLETING. // 如果修改成功则将结果赋值给 outcome // 再将 FutureTask 的状态改成 NORMAL // 唤醒等待获取异步任务计算结果的线程 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); 2019-08-07 工作中遇到的问题 大概是: 有个线程池满了,然后新的任务使用CompletableFuture.supplyAsync执行,用future1.get(1, TimeUnit.SECONDS)) 去获取的时候报错java.util.concurrent.TimeoutException 报错java.util.concurrent.TimeoutException觉得很奇怪;随... 获取request对象的请求头时报了空指针异常,意思就是主线程中的请求头并没有带过来。在主线程里调用子线程时将request传递过去,并设置子线程的请求头。查看feign配置FeignConfig.java。 转载:https://blog.csdn.net/Mynah886/article/details/107005410?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-3.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCo CompletableFuture优化接口性能案例需求背景解决方案 在教育中心所有课程详情页都大致包含下面这么些,每种卡片的数据可能来自不同的数据源。数据源可能就是缓存数据,也可能是第三方的接口返回数据。因此对于这个接口一次性返回这么多卡片信息还是有点重的,我们尽量要使得每个卡片请求的数据要够快。对于头部影藏卡片而言,在数据适配阶段串行调用了三个第三方接口,而且每个接口最大超时时间outTime=200~300ms,对整个接口的性能还是有很大影响。 课程详情头部隐藏卡片,课程详情头部促销卡片, 在call()未执行完毕之前,调用get()的线程(假定此时是主线程)会被阻塞,直到call方法返回了结果后,此时future.get才会得到结果,然后主线程才会切换到runnable状态 在jdk5中,我们通过使用Future和Callable,可以在任务执行完毕后得到任务执行结果。可以使用isDone检测计算是否完成,使用cancle停止执行任务,使用阻塞方法get阻塞住调用线程来获取返回结果,使用阻塞方式获取执行结果,有违异步编程的初衷,而且Future的异常只能自己内部处理。 jdk8中加入了实现类CompletableFuture<T>,用于异步编程。底层做任务使用的是ForkJoin, 顾名思义,是将任务的数据集分为多个子数据集,而每个子集,都可以由独立的子任 Feign接口的Request 默认是主线程和子线程不共享的,当异步调用Feign接口会因为获取不到ServletRequestAttributes报空指针。我们先获取到当前请求,再分享给子线程。首先获取attributes。 在Java应用中,绝大多数情况下都是通过同步的方式来实现交互处理的;但是在处理与第三方系统交互的时候,容易造成响应迟缓的情况,之前大部分都是使用多线程来完成此类任务,其实,在spring3.x之后,就已经内置了@Async来完美解决这个问题。 1.@Async介绍 在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无... * @throws CancellationException {@inheritDoc} public V get() throws InterruptedException, ExecutionException { int s = state; FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行(FutureTask.run())。根据 FutureTask.run()方法被执行的时机,FutureTask 可以处于下面 3 种状态。1) 未启动: 当创建一个 FutureTask,且没有执行 FutureTask.run()方法之前,这个 FutureTask 处于未启动状态。 记录问题,暂时还没得到解决 最近在工作中遇到这个问题,自己找了很多资料,始终没得到解决,所以发出来看看是否有人跟我遇到同样的问题,一起讨论 关于这个Future接口有以下几个方法 public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedE 目录FutureTask 简介get 方法cancel 方法FutureTask 的基本使用 FutureTask 简介 在 Executors 框架体系中,FutureTask 用来表示可获取结果的异步任务。FutureTask 实现了 Future 接口,FutureTask 提供了启动和取消异步任务,查询异步任务是否计算结束以及获取最终的异步任务的结果的一些常用的方法。通过get()方法来获取异步任务的结果,但是会阻塞当前线程直至异步任务执行结束。一旦任务执行结束,任务不能重新启动或取消,除非调用ru 文章目录前言FutrueTask为 FutrueTask 增加自定义代码逻辑实现 Callable 接口并对象传入 FutureTask 构造函数FutureTask 的启动使用 Thread.start() 执行 FutureTask使用线程池执行 FutureTask线程池和 FutureTask 类线程池和 Future 接口FutureTask.get()FutureTask.get() ..................