【并发编程】Future模式添加Callback及Promise 模式
Future
Future是Java5增加的类,它用来描述一个异步计算的结果。你可以使用 isDone 方法检查计算是否完成,或者使用 get 方法阻塞住调用线程,直到计算完成返回结果。你也可以使用 cancel 方法停止任务的执行。下面来一个栗子:
public class FutureDemo { public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(10); Future<Integer> f = es.submit(() ->{ Thread.sleep(10000); // 结果 return 100; }); // do something Integer result = f.get(); System.out.println(result); // while (f.isDone()) { // System.out.println(result); // } } }
在这个例子中,我们往线程池中提交了一个任务并立即返回了一个Future对象,接着可以做一些其他操作,最后利用它的 get 方法阻塞等待结果或 isDone 方法轮询等待结果(关于Future的原理可以参考之前的文章:【并发编程】Future模式及JDK中的实现)
虽然这些方法提供了异步执行任务的能力,但是对于结果的获取却还是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时的得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如Node.js,采用Callback的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future 接口,提供了 addListener 等多个扩展方法。Google的guava也提供了通用的扩展Future:ListenableFuture 、 SettableFuture 以及辅助类 Futures 等,方便异步编程。为此,Java终于在JDK1.8这个版本中增加了一个能力更强的Future类:CompletableFuture 。它提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果。下面来看看这几种方式。
Netty-Future
引入Maven依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.29.Final</version> </dependency>
public class NettyFutureDemo { public static void main(String[] args) throws InterruptedException { EventExecutorGroup group = new DefaultEventExecutorGroup(4); System.out.println("开始:" + DateUtils.getNow()); Future<Integer> f = group.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("开始耗时计算:" + DateUtils.getNow()); Thread.sleep(10000); System.out.println("结束耗时计算:" + DateUtils.getNow()); return 100; } }); f.addListener(new FutureListener<Object>() { @Override public void operationComplete(Future<Object> objectFuture) throws Exception { System.out.println("计算结果:" + objectFuture.get()); } }); System.out.println("结束:" + DateUtils.getNow()); // 不让守护线程退出 new CountDownLatch(1).await(); } }
输出结果:
开始:2019-05-16 08:25:40:779 结束:2019-05-16 08:25:40:788 开始耗时计算:2019-05-16 08:25:40:788 结束耗时计算:2019-05-16 08:25:50:789 计算结果:100
从结果可以看出,耗时计算结束后自动触发Listener的完成方法,避免了主线程无谓的阻塞等待,那么它究竟是怎么做到的呢?下面看源码
DefaultEventExecutorGroup 实现了 EventExecutorGroup 接口,而 EventExecutorGroup 则是实现了JDK ScheduledExecutorService 接口的线程组接口,所以它拥有线程池的所有方法。然而它却把所有返回 java.util.concurrent.Future 的方法重写为返回 io.netty.util.concurrent.Future ,把所有返回 java.util.concurrent.ScheduledFuture 的方法重写为返回 io.netty.util.concurrent.ScheduledFuture 。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> { /** * 返回一个EventExecutor */ EventExecutor next(); Iterator<EventExecutor> iterator(); Future<?> submit(Runnable task); <T> Future<T> submit(Runnable task, T result); <T> Future<T> submit(Callable<T> task); ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
EventExecutorGroup 的submit方法因为 newTaskFor 的重写导致返回了netty的 Future 实现类,而这个实现类正是 PromiseTask 。
@Override public <T> Future<T> submit(Callable<T> task) { return (Future<T>) super.submit(task); } @Override protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new PromiseTask<T>(this, callable); }
PromiseTask 的实现很简单,它缓存了要执行的 Callable 任务,并在run方法中完成了任务调用和Listener的通知。
@Override public void run() { try { if (setUncancellableInternal()) { V result = task.call(); setSuccessInternal(result); } } catch (Throwable e) { setFailureInternal(e); } } @Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } @Override public Promise<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this, cause); }
任务调用成功或者失败都会调用 notifyListeners 来通知Listener,所以大家得在回调的函数里调用 isSuccess 方法来检查状态。
这里有一个疑惑,会不会 Future 在调用 addListener 方法的时候任务已经执行完成了,这样子会不会通知就会失败了啊?
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
可以发现,在Listener添加成功之后,会立即检查状态,如果任务已经完成立刻进行回调,所以这里不用担心啦。OK,下面看看Guava-Future的实现。
Guava-Future
首先引入guava的Maven依赖:
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>22.0</version> </dependency>
public class GuavaFutureDemo { public static void main(String[] args) throws InterruptedException { System.out.println("开始:" + DateUtils.getNow()); ExecutorService executorService = Executors.newFixedThreadPool(10); ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService); ListenableFuture<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("开始耗时计算:" + DateUtils.getNow()); Thread.sleep(10000); System.out.println("结束耗时计算:" + DateUtils.getNow()); return 100; } }); future.addListener(new Runnable() { @Override public void run() { System.out.println("调用成功"); } }, executorService); System.out.println("结束:" + DateUtils.getNow()); new CountDownLatch(1).await(); } }
ListenableFuture 可以通过 addListener 方法增加回调函数,一般用于不在乎执行结果的地方。如果需要在执行成功时获取结果或者执行失败时获取异常信息,需要用到 Futures 工具类的 addCallback 方法:
Futures.addCallback(future, new FutureCallback<Integer>() { @Override public void onSuccess(@Nullable Integer result) { System.out.println("成功,计算结果:" + result); } @Override public void onFailure(Throwable t) { System.out.println("失败"); } }, executorService);
前面提到除了 ListenableFuture 外,还有一个 SettableFuture 类也支持回调能力。它实现自 ListenableFuture ,所以拥有 ListenableFuture 的所有能力。
public class GuavaFutureDemo { public static void main(String[] args) throws InterruptedException { System.out.println("开始:" + DateUtils.getNow()); ExecutorService executorService = Executors.newFixedThreadPool(10); ListenableFuture<Integer> future = submit(executorService); Futures.addCallback(future, new FutureCallback<Integer>() { @Override public void onSuccess(@Nullable Integer result) { System.out.println("成功,计算结果:" + result); } @Override public void onFailure(Throwable t) { System.out.println("失败:" + t.getMessage()); } }, executorService); Thread.sleep(1000); System.out.println("结束:" + DateUtils.getNow()); new CountDownLatch(1).await(); } private static ListenableFuture<Integer> submit(Executor executor) { SettableFuture<Integer> future = SettableFuture.create(); executor.execute(new Runnable() { @Override public void run() { System.out.println("开始耗时计算:" + DateUtils.getNow()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("结束耗时计算:" + DateUtils.getNow()); // 返回值 future.set(100); // 设置异常信息 // future.setException(new RuntimeException("custom error!")); } }); return future; } }
看起来用法上没有太多差别,但是有一个很容易被忽略的重要问题。当 SettableFuture 的这种方式最后调用了 cancel 方法后,线程池中的任务还是会继续执行,而通过 submit 方法返回的 ListenableFuture 方法则会立即取消执行,这点尤其要注意。下面看看源码:
和Netty的Future一样,Guava也是通过实现了自定义的 ExecutorService 实现类 ListeningExecutorService 来重写了 submit 方法。
public interface ListeningExecutorService extends ExecutorService { <T> ListenableFuture<T> submit(Callable<T> task); ListenableFuture<?> submit(Runnable task); <T> ListenableFuture<T> submit(Runnable task, T result); }
同样的,newTaskFor 方法也被进行了重写,返回了自定义的Future类:TrustedListenableFutureTask
@Override protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return TrustedListenableFutureTask.create(runnable, value); } @Override protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return TrustedListenableFutureTask.create(callable); }
任务调用会走 TrustedFutureInterruptibleTask 的run方法:
@Override public void run() { TrustedFutureInterruptibleTask localTask = task; if (localTask != null) { localTask.run(); } } @Override public final void run() { if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) { return; // someone else has run or is running. } try { // 抽象方法,子类进行重写 runInterruptibly(); } finally { if (wasInterrupted()) { while (!doneInterrupting) { Thread.yield(); } } } }
最终还是调用到 TrustedFutureInterruptibleTask 的 runInterruptibly 方法,等待任务完成后调用 set 方法。
@Override void runInterruptibly() { if (!isDone()) { try { set(callable.call()); } catch (Throwable t) { setException(t); } } } protected boolean set(@Nullable V value) { Object valueToSet = value == null ? NULL : value; // CAS设置值 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { complete(this); return true; } return false; }
在 complete 方法的最后会获取到Listener进行回调。
上面提到的 SettableFuture 和 ListenableFuture 的 cancel 方法效果不同,原因在于一个重写了 afterDone 方法而一个没有。
下面是 ListenableFuture 的 afterDone 方法:
@Override protected void afterDone() { super.afterDone(); if (wasInterrupted()) { TrustedFutureInterruptibleTask localTask = task; if (localTask != null) { localTask.interruptTask(); } } this.task = null; }
wasInterrupted 用来判断是否调用了 cancel (cancel方法会设置一个取消对象Cancellation到value中)
protected final boolean wasInterrupted() { final Object localValue = value; return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; }
interruptTask 方法通过线程的 interrupt 方法真正取消线程任务的执行:
final void interruptTask() { Thread currentRunner = runner; if (currentRunner != null) { currentRunner.interrupt(); } doneInterrupting = true; }
由 Callback Hell 引出 Promise 模式
如果你对 ES6 有所接触,就不会对 Promise 这个模式感到陌生,如果你对前端不熟悉,也不要紧,我们先来看看回调地狱(Callback Hell)是个什么概念。
回调是一种我们推崇的异步调用方式,但也会遇到问题,也就是回调的嵌套。当需要多个异步回调一起书写时,就会出现下面的代码(以 js 为例):
asyncFunc1(opt, (...args1) => { asyncFunc2(opt, (...args2) => { asyncFunc3(opt, (...args3) => { asyncFunc4(opt, (...args4) => { // some operation }); }); }); });
虽然在 JAVA 业务代码中很少出现回调的多层嵌套,但总归是个问题,这样的代码不易读,嵌套太深修改也麻烦。于是 ES6 提出了 Promise 模式来解决回调地狱的问题。可能就会有人想问:java 中存在 Promise 模式吗?答案是肯定的。
前面提到了 Netty 和 Guava 的扩展都提供了 addListener 这样的接口,用于处理 Callback 调用,但其实 jdk1.8 已经提供了一种更为高级的回调方式:CompletableFuture。首先尝试用 CompletableFuture 来重写上面回调的问题。
public class CompletableFutureTest { public static void main(String[] args) throws InterruptedException { System.out.println("开始:" + DateUtils.getNow()); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("开始耗时计算:" + DateUtils.getNow()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("结束耗时计算:" + DateUtils.getNow()); return 100; }); completableFuture.whenComplete((result, e) -> { System.out.println("回调结果:" + result); }); System.out.println("结束:" + DateUtils.getNow()); new CountDownLatch(1).await(); } }
使用CompletableFuture耗时操作没有占用主线程的时间片,达到了异步调用的效果。我们也不需要引入任何第三方的依赖,这都是依赖于 java.util.concurrent.CompletableFuture 的出现。CompletableFuture 提供了近 50 多个方法,大大便捷了 java 多线程操作,和异步调用的写法。
使用 CompletableFuture 解决回调地狱问题:
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException { long l = System.currentTimeMillis(); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("在回调中执行耗时操作..."); Thread.sleep(10000); return 100; }); completableFuture = completableFuture.thenCompose(i -> { return CompletableFuture.supplyAsync(() -> { System.out.println("在回调的回调中执行耗时操作..."); Thread.sleep(10000); return i + 100; }); }); completableFuture.whenComplete((result, e) -> { System.out.println("计算结果:" + result); }); System.out.println("主线程运算耗时:" + (System.currentTimeMillis() - l) + " ms"); new CountDownLatch(1).await(); } }
输出:
在回调中执行耗时操作...主线程运算耗时:58 ms在回调的回调中执行耗时操作...计算结果:200
使用 thenCompose 或者 thenComposeAsync 等方法可以实现回调的回调,且写出来的方法易于维护。
总的看来,为Future模式增加回调功能就不需要阻塞等待结果的返回并且不需要消耗无谓的CPU资源去轮询处理状态,JDK8之前使用Netty或者Guava提供的工具类,JDK8之后则可以使用自带的 CompletableFuture 类。Future 有两种模式:将来式和回调式。而回调式会出现回调地狱的问题,由此衍生出了 Promise 模式来解决这个问题。这才是 Future 模式和 Promise 模式的相关性。
作者注:欢迎关注笔者公众号(ID:weknow619,或扫头像),定期分享IT互联网、金融等工作经验心得、人生感悟,欢迎订阅交流,目前就职阿里-移动事业部,需要大厂内推的也可到公众号砸简历。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
LINUX之Bash-Script
最近把Linux Bash编程的知识复习了一遍,大概梳理了一下,做个记录。 第一个Bash-Script LINUX shell的种类非常之多,但是目前用得最为广泛的还是Bash,本文也是基于Bash的Shell环境。 下面是一个简单的示例: #! /bin/sh echo 'hello world!' 这就是一个最简单的shell脚本了。 第一行的#!用来告诉系统,这个脚本用什么解释器来执行(说明:sh和bash本身是不同的Shell,但是在我目前用得CentOS7版本sh和bash是等价的,sh是一个指向bash的符号链接)。 echo命令输出文本到屏幕 如何运行脚本 一种方式就是将脚本作为解释器的参数,如: sh test.sh 第二种方式就是授予文件可执行权限 chmod +x test.sh 或者 chmod 755 test.sh 执行脚本 ./test.sh 变量与参数 变量 Bash是一种弱类型的语言,你只需要直接定义变量名=value即可。当需要引用这个变量的时候使用$var_name或者${var_name}即可。 $var_name是${var_name}的一种简...
- 下一篇
由for update引发的血案
微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。 老司机倾囊相授,带你一路进阶,来不及解释了快上车! 公司的某些业务用到了数据库的悲观锁 for update,但有些同事没有把 for update 放在 Spring 事务中执行,在并发场景下发生了严重的线程阻塞问题,为了把这个问题吃透,秉承着老司机的职业素养,我决定要给同事们一个交代。 案发现场 最近公司的某些 Dubbo 服务之间的 RPC 调用过程中,偶然性地发生了若干起严重的超时问题,导致了某些模块不能正常提供服务。我们的数据库用的是 Oracle,经过 DBA 排查,发现了一些 sql 的执行时间特别长,对比发现这些执行时间长的 sql 都带有 for update 悲观锁,于是相关开发人员查看 sql 对应的业务代码,发现 for update 没有放在 Spring 事务中执行,但是按照常理来说,如果 for update 没有加 Spring 事务,每次执行完 Mybatis 都会帮我们 commit 释放掉资源,并发时出现的问题应该是没有锁住对应资源产生脏数据...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2配置默认Tomcat设置,开启更多高级功能