异步神器CompletableFuture
 
  介绍
上个礼拜我们线上有个接口比较慢,这个接口在刚开始响应时间是正常的。但随着数据量的增多,响应时间变慢了。
这个接口里面顺序调用了2个服务,且2个服务之间没有数据依赖。我就用CompletableFuture把调用2个服务的过程异步化了一下,响应时间也基本上缩短为原来的一半,问题解决。
 
  正好上次分享了函数式接口和Stream的使用,这次就分享一下CompletableFuture,里面也用到了大量的函数式接口
想方便的异步执行任务,就必须放到单独的线程中。继承Thread类,实现Runnable都不能拿到任务的执行结果,这时就不得不提创建线程的另一种方式了,实现Callable接口。
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
 
  Callable接口一般配合ExecutorService来使用
// ExecutorService.java
<T> Future<T> submit(Callable<T> task);
 
  ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(() -> {
    int sum = 0;
    for (int i = 0; i < 100; i++) {
        sum += i;
    }
    return sum;
});
// 4950
System.out.println(result.get());
 
  我们从Future中获取结果
public interface Future<V> {
 // 取消任务的执行
    boolean cancel(boolean mayInterruptIfRunning);
 // 任务是否已经取消
    boolean isCancelled();
 // 任务是否已经完成
    boolean isDone();
 // 获取任务执行结果,会阻塞线程
    V get() throws InterruptedException, ExecutionException;
 // 超时获取任务执行结果,会阻塞线程
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
 
  对于简单的场景使用Future并没有什么不方便。但是一些复杂的场景就很麻烦, 如2个异步任务,其中一个有结果就直接返回。Future用起来就不方便,因为想获取结果时,要么执行future.get()方法,但是这样会阻塞线程,变成同步操作,要么轮询isDone()方法,但是比较耗费CPU资源。
Netty和Google guava为了解决这个问题,在Future的基础上引入了观察者模式(即在Future上addListener),当计算结果完成时通知监听者。
Java8新增的CompletableFuture则借鉴了Netty等对Future的改造,简化了异步编程的复杂性,并且提供了函数式编程的能力
创建CompletableFuture对象
| 方法名 | 描述 | 
|---|---|
| completedFuture(U value) | 返回一个已经计算好的CompletableFuture | 
| runAsync(Runnable runnable) | 使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值 | 
| runAsync(Runnable runnable, Executor executor) | 使用指定的线程池执行任务,没有返回值 | 
| supplyAsync(Supplier<U> supplier) | 使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值 | 
| supplyAsync(Supplier<U> supplier, Executor executor) | 使用指定的线程池执行任务,有返回值 | 
@FunctionalInterface
public interface Supplier<T> {
    T get();
}
 
  Supplier在《用好强大的Stream》中已经介绍过了,是一个能获取返回值的函数式接口
CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
// 100
System.out.println(intFuture.get());
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
// null
System.out.println(voidFuture.get());
CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
// hello
System.out.println(stringFuture.get());
 
   
  计算结果完成时
| 方法名 | 
|---|
| whenComplete(BiConsumer<? super T,? super Throwable> action) | 
| whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | 
| whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | 
因为入参是BiConsumer<? super T,? super Throwable>函数式接口,所以可以处理正常和异常的计算结果
whenComplete和whenCompleteAsync的区别如下
-  
    
whenComplete:执行完当前任务的线程继续执行whenComplete的任务  -  
    
whenCompleteAsync:把whenCompleteAsync这个任务提交给线程池来执行  
CompletableFuture的所有方法的定义和whenComplete都很类似
-  
    
方法不以Async结尾意味着使用相同的线程执行  -  
    
方法以Async结尾意味着将任务提交到线程池来执行  -  
    
方法以Async结尾时可以用ForkJoinPool.commonPool()作为线程池,也可以使用自己的线程池  
后续介绍的所有方法都只写一种case
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    return "hello";
}).whenComplete((v, e) -> {
    // hello
    System.out.println(v);
});
// hello
System.out.println(future.get());
 
   
  转换,消费,执行
| 方法名 | 描述 | 
|---|---|
| thenApply | 获取上一个任务的返回,并返回当前任务的值 | 
| thenAccept | 获取上一个任务的返回,单纯消费,没有返回值 | 
| thenRun | 上一个任务执行完成后,开始执行thenRun中的任务 | 
CompletableFuture.supplyAsync(() -> {
    return "hello ";
}).thenAccept(str -> {
    // hello world
    System.out.println(str + "world");
}).thenRun(() -> {
    // task finish
    System.out.println("task finish");
});
 
   
  组合(两个任务都完成)
| 方法名 | 描述 | 
|---|---|
| thenCombine | 组合两个future,获取两个future的返回结果,并返回当前任务的返回值 | 
| thenAcceptBoth | 组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值 | 
| runAfterBoth | 组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务 | 
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    return "欢迎关注 ";
}).thenApply(t -> {
    return t + "微信公众号 ";
}).thenCombine(CompletableFuture.completedFuture("Java识堂"), (t, u) -> {
    return t + u;
}).whenComplete((t, e) -> {
    // 欢迎关注 微信公众号 Java识堂
    System.out.println(t);
});
 
   
  组合(只需要一个任务完成)
| 方法名 | 描述 | 
|---|---|
| applyToEither | 两个任务有一个执行完成,获取它的返回值,处理任务并返回当前任务的返回值 | 
| acceptEither | 两个任务有一个执行完成,获取它的返回值,处理任务,没有返回值 | 
| runAfterEither | 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值 | 
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "欢迎关注微信公众号";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "Java识堂";
});
CompletableFuture future = future1.applyToEither(future2, str -> str);
// 欢迎关注微信公众号 Java识堂 随机输出
System.out.println(future.get());
 
  sleepRandom()为我写的一个随机暂停的函数
多任务组合
| 方法名 | 描述 | 
|---|---|
| allOf | 当所有的CompletableFuture完成后执行计算 | 
| anyOf | 任意一个CompletableFuture完成后执行计算 | 
allOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "欢迎关注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "微信公众号";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "Java识堂";
});
// 欢迎关注 微信公众号 Java识堂
CompletableFuture.allOf(future1, future2, future3)
        .thenApply(v ->
                Stream.of(future1, future2, future3)
                        .map(CompletableFuture::join)
                        .collect(Collectors.joining(" ")))
        .thenAccept(System.out::print);
 
  anyOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "欢迎关注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "微信公众号";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "Java识堂";
});
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);
// 欢迎关注 微信公众号 Java识堂 随机输出
System.out.println(resultFuture.get());
 
   
  异常处理
| 方法名 | 描述 | 
|---|---|
| exceptionally | 捕获异常,进行处理 | 
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100 / 0;
}).thenApply(num -> {
    return num + 10;
}).exceptionally(throwable -> {
    return 0;
});
// 0
System.out.println(future.get());
 
  当然有一些接口能捕获异常
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    String str = null;
    return str.length();
}).whenComplete((v, e) -> {
    if (e == null) {
        System.out.println("正常结果为" + v);
    } else {
        // 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException
        System.out.println("发生异常了" + e.toString());
    }
});
 
   
  欢迎关注
 
  本文分享自微信公众号 - Java识堂(erlieStar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
关注公众号
					低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 
							
								
								    上一篇
								    
								
								就靠这几段代码,带你玩转rpc通信协议,不信你学不明白
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。 2.基本概念 RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务 本地过程调用:如果需要将本地student对象的age+1,可以实现一个addAge()方法,将student对象传入,对年龄进行更新之后返回即可,本地方法调用的函数体通过函数指针来指定。 远程过程调用:上述操作的过程中,如果addAge()这个方法在服务端,执行函数的函数体在远程机器上,如何告诉机器需要调用这个方法呢? 今天,我们就通过一个实例代码进行演示,一步步的查看,rpc的通信是如何进行的,有兴趣的朋友可以...
 - 
							
								
								    下一篇
								    
								
								窥探 Netty 源码!FastThreadLocal 究竟快在哪里?
本文选自 Doocs 开源社区旗下“源码猎人”项目,作者 tydhot。项目将会持续更新,欢迎 Star 关注。项目地址:https://github.com/doocs/source-code-hunter 本文涉及到的 Netty 源码版本为 4.1.6。 Netty 的 FastThreadLocal 是什么 简而言之,FastThreadLocal 是在 ThreadLocal 实现上的一种变种,相比 ThreadLocal 内部通过将自身 hash 的方式在 hashTable 上定位需要的变量存储位置,FastThreadLocal 选择在数组上的一个固定的常量位置来存放线程本地变量,这样的操作看起来并没有太大区别,但是相比 ThreadLocal 的确体现了性能上的优势,尤其是在读操作频繁的场景下。 如何使用 FastThreadLocal 如果想要得到 FastThreadLocal 的速度优势,必须通过 FastThreadLocalThread 或者其子类的线程,才可以使用,因为这个原因,Netty 的 DefaultThreadFactory,其内部默认线程工厂的...
 
相关文章
文章评论
共有0条评论来说两句吧...

			
				
				
				
				
				
				
				
微信收款码
支付宝收款码