介绍
上个礼拜我们线上有个接口比较慢,这个接口在刚开始响应时间是正常的。但随着数据量的增多,响应时间变慢了。
这个接口里面顺序调用了2个服务,且2个服务之间没有数据依赖。我就用CompletableFuture把调用2个服务的过程异步化了一下,响应时间也基本上缩短为原来的一半,问题解决。
正好上次分享了函数式接口和Stream的使用,这次就分享一下CompletableFuture,里面也用到了大量的函数式接口
想方便的异步执行任务,就必须放到单独的线程中。继承Thread类,实现Runnable都不能拿到任务的执行结果,这时就不得不提创建线程的另一种方式了,实现Callable接口。
@FunctionalInterface public interface Callable <V > { V call () throws Exception ; }
Callable接口一般配合ExecutorService来使用
// ExecutorService.java <T> Future<T> submit (Callable<T> task) ;
ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> result = executor.submit(() -> { int sum = 0 ; for (int i = 0 ; i < 100 ; i++) { sum += i; } return sum; });// 4950 System.out.println(result.get());
我们从Future中获取结果
public interface Future <V > { // 取消任务的执行 boolean cancel (boolean mayInterruptIfRunning) ; // 任务是否已经取消 boolean isCancelled () ; // 任务是否已经完成 boolean isDone () ; // 获取任务执行结果,会阻塞线程 V get () throws InterruptedException, ExecutionException ; // 超时获取任务执行结果,会阻塞线程 V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ; }
对于简单的场景使用Future并没有什么不方便。但是一些复杂的场景就很麻烦, 如2个异步任务,其中一个有结果就直接返回。Future用起来就不方便,因为想获取结果时,要么执行future.get()方法,但是这样会阻塞线程,变成同步操作,要么轮询isDone()方法,但是比较耗费CPU资源。
Netty和Google guava为了解决这个问题,在Future的基础上引入了观察者模式(即在Future上addListener),当计算结果完成时通知监听者。
Java8新增的CompletableFuture则借鉴了Netty等对Future的改造,简化了异步编程的复杂性,并且提供了函数式编程的能力
创建CompletableFuture对象
方法名
描述
completedFuture(U value)
返回一个已经计算好的CompletableFuture
runAsync(Runnable runnable)
使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值
runAsync(Runnable runnable, Executor executor)
使用指定的线程池执行任务,没有返回值
supplyAsync(Supplier<U> supplier)
使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值
supplyAsync(Supplier<U> supplier, Executor executor)
使用指定的线程池执行任务,有返回值
@FunctionalInterface public interface Supplier <T > { T get () ; }
Supplier在《用好强大的Stream 》中已经介绍过了,是一个能获取返回值的函数式接口
CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100 );// 100 System.out.println(intFuture.get()); CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello" ));// null System.out.println(voidFuture.get()); CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello" );// hello System.out.println(stringFuture.get());
计算结果完成时
方法名
whenComplete(BiConsumer<? super T,? super Throwable> action)
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
因为入参是BiConsumer<? super T,? super Throwable>函数式接口,所以可以处理正常和异常的计算结果
whenComplete和whenCompleteAsync的区别如下
whenComplete:执行完当前任务的线程继续执行whenComplete的任务
whenCompleteAsync:把whenCompleteAsync这个任务提交给线程池来执行
CompletableFuture的所有方法的定义和whenComplete都很类似
方法以Async结尾意味着将任务提交到线程池来执行
方法以Async结尾时可以用ForkJoinPool.commonPool()作为线程池,也可以使用自己的线程池
后续介绍的所有方法都只写一种case
CompletableFuture future = CompletableFuture.supplyAsync(() -> { return "hello" ; }).whenComplete((v, e) -> { // hello System.out.println(v); });// hello System.out.println(future.get());
转换,消费,执行
方法名
描述
thenApply
获取上一个任务的返回,并返回当前任务的值
thenAccept
获取上一个任务的返回,单纯消费,没有返回值
thenRun
上一个任务执行完成后,开始执行thenRun中的任务
CompletableFuture.supplyAsync(() -> { return "hello " ; }).thenAccept(str -> { // hello world System.out.println(str + "world" ); }).thenRun(() -> { // task finish System.out.println("task finish" ); });
组合(两个任务都完成)
方法名
描述
thenCombine
组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth
组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
runAfterBoth
组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> { return "欢迎关注 " ; }).thenApply(t -> { return t + "微信公众号 " ; }).thenCombine(CompletableFuture.completedFuture("Java识堂" ), (t, u) -> { return t + u; }).whenComplete((t, e) -> { // 欢迎关注 微信公众号 Java识堂 System.out.println(t); });
组合(只需要一个任务完成)
方法名
描述
applyToEither
两个任务有一个执行完成,获取它的返回值,处理任务并返回当前任务的返回值
acceptEither
两个任务有一个执行完成,获取它的返回值,处理任务,没有返回值
runAfterEither
两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "欢迎关注微信公众号" ; }); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "Java识堂" ; }); CompletableFuture future = future1.applyToEither(future2, str -> str);// 欢迎关注微信公众号 Java识堂 随机输出 System.out.println(future.get());
sleepRandom()为我写的一个随机暂停的函数
多任务组合
方法名
描述
allOf
当所有的CompletableFuture完成后执行计算
anyOf
任意一个CompletableFuture完成后执行计算
allOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "欢迎关注" ; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "微信公众号" ; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "Java识堂" ; });// 欢迎关注 微信公众号 Java识堂 CompletableFuture.allOf(future1, future2, future3) .thenApply(v -> Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" " ))) .thenAccept(System.out::print);
anyOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "欢迎关注" ; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "微信公众号" ; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { sleepRandom(); return "Java识堂" ; }); CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);// 欢迎关注 微信公众号 Java识堂 随机输出 System.out.println(resultFuture.get());
异常处理
方法名
描述
exceptionally
捕获异常,进行处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 100 / 0 ; }).thenApply(num -> { return num + 10 ; }).exceptionally(throwable -> { return 0 ; });// 0 System.out.println(future.get());
当然有一些接口能捕获异常
CompletableFuture future = CompletableFuture.supplyAsync(() -> { String str = null ; return str.length(); }).whenComplete((v, e) -> { if (e == null ) { System.out.println("正常结果为" + v); } else { // 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException System.out.println("发生异常了" + e.toString()); } });
欢迎关注