解构反应式编程——Java 8, RxJava, Reactor之比较
如果你熟悉Java 8,同时又了解反应式编程(Reactive Programming)框架,例如RxJava和Reactor等,你可能会问:
“如果我可以用Java 8 的Stream, CompletableFuture, 以及Optional完成同样的事情,为什么还要用RxJava 或者 Reactor呢?”
问题在于,大多数时候你在处理的是简单的任务,这个时候你确实不需要那些反应式编程的库。但是,当系统越来越复杂,或者你处理的本身就是个复杂的任务,你恐怕就得写一些让自己头皮发麻的代码。随着时间的推移,这些代码会变得越来越复杂和难以维护。
RxJava和Reactor提供了很多非常趁手的功能,能够支持你在未来更轻松地维护你的代码,实现新需求。但是这个优势到底有多大,具体体现在哪些方面?没有标准无法比较,让我们定义8个比较的维度,来帮助我们理解Java 8的API以及反应式编程的库之间的差别。
- Composable(可组装)
- Lazy(延迟执行)
- Reusable(可重用)
- Asynchronous(异步)
- Cacheable(可缓存)
- Push or Pull(推还是拉)
- Backpressure(反压)
- Operator fusion(操作融合)
针对上面这些维度,我们比较以下的这些类:
- CompletableFuture
- Stream
- Optional
- Observable (RxJava 1)
- Observable (RxJava 2)
- Flowable (RxJava 2)
- Flux (Reactor Core)
准备好了吗?我们开始!
Composable(可组装)
上面所有的这7个类都是可组装的,支持函数式的编程方式,这是我们喜欢它们的原因。
- CompletableFuture - 提供很多的.then*()方法,这些方法允许我们构建一个流水线,在不同的执行阶段之间传递一个单一的值(或者没有值),以及传递异常对象。
- Stream - 提供很多的可以链式编程方式连接起来的操作,不同的操作阶段之间可以传递N个值。
- Optional - 提供一些中间操作,如: .map(), .flatMap(), .filter().
- Observable, Flowable, Flux - 跟Stream相同
Lazy(延迟执行)
- CompletableFuture - 非延迟执行,它本质上只是一个异步结果的持有者。这些对象创建出来是为了代表对应的工作,CompletableFuture创建的时候,对应的工作已经开始执行了。它不知道任何的关于工作的具体内容,只是关心结果。所以,没有办法能走到上游去从上到下执行整个流水线。当结果被塞到CompletableFuture对象的时候,下一个阶段开始执行。
- Stream - 所有的中间操作都是延迟执行的。所有的终端操作,会触发整个计算。
- Optional - 非延迟执行,所有的操作会马上发生。
- Observable, Flowable, Flux - 延迟执行,没有订阅者的话,什么都不会做,只有当有订阅者的时候才会执行。
Reusable(可重用)
- CompletableFuture - 可以重用,它只是在一个值外面做了一层包装。但需要注意一点,这个包装是可更改的。.obtrude*()方法会更改它的内容,如果你确定没有人会调用到这类方法,那么重用它还是安全的。
- Stream - 不能重用。Java Doc已经说了:
A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.
翻译过来就是:Stream只能被操作(调用中间操作或者终端操作)一次。如果一个stream的实现检测到流被重复使用了,它可以抛出一个IllegalStateException。但是因为某些流操作会返回他们的receiver,而不是一个新的stream对象,并不是在所有的情况下都能够检测出重用。
- Optional - 完全可重用,因为它是不可变对象,而且所有工作都是立即执行的。
- Observable, Flowable, Flux - 就是设计来可重用的。所有的执行会从初始点开始,走过所有阶段,前提是有订阅者。
Asynchronous(异步)
- CompletableFuture - 嗯...这个类存在的目的就是异步的把多个操作链接起来。CompletableFuture代表一个工作,后面跟一个Executor关联起来。如果你不明确指定一个executor,那么系统会使用公共的ForkJoinPool线程池来执行。这个线程池可以用ForkJoinPool.commonPool()获取到。默认的设置下它会创建系统硬件支持的线程数一样多的线程(通常就是跟CPU的核心数,如果你的CPU支持超线程,那么可能再翻一倍)。不过你也可以设置ForkJoinPool线程池的线程数,用以下JVM option:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=?
或者每次调用的时候提供一个定制的Executor。
- Stream - 不支持创建异步过程,但是可以支持并行的计算——通过stream.parallel()等方式创建并行流。
- Optional - 不支持,它只是一个容器。
- Observable, Flowable, Flux - 目标就是为了构建异步的系统,但是默认情况下还是同步的。subscribeOn和observeOn允许你来控制消息的订阅以及消息的接收(指定当你的observer的 onNext / onError / onCompleted 被调用的时候做什么事情)。
subscribeOn让你决定用哪个Scheduler来执行Observable.create。即便你自己没有调用create,系统内部也会做类似的事情。示例:
Observable .fromCallable(() -> { log.info("Reading on thread: " + currentThread().getName()); return readFile("input.txt"); }) .map(text -> { log.info("Map on thread: " + currentThread().getName()); return text.length(); }) .subscribeOn(Schedulers.io()) // <-- setting scheduler .subscribe(value -> { log.info("Result on thread: " + currentThread().getName()); });
输出:
Reading file on thread: RxIoScheduler-2 Map on thread: RxIoScheduler-2 Result on thread: RxIoScheduler-2
相反的,observeOn()决定在observeOn()之后,用哪个Scheduler来运行下游的执行阶段。示例:
Observable .fromCallable(() -> { log.info("Reading on thread: " + currentThread().getName()); return readFile("input.txt"); }) .observeOn(Schedulers.computation()) // <-- setting scheduler .map(text -> { log.info("Map on thread: " + currentThread().getName()); return text.length(); }) .subscribeOn(Schedulers.io()) // <-- setting scheduler .subscribe(value -> { log.info("Result on thread: " + currentThread().getName()); });
输出:
Reading file on thread: RxIoScheduler-2 Map on thread: RxComputationScheduler-1 Result on thread: RxComputationScheduler-1
Cacheable(可缓存)
可缓存和可重用之间的区别是什么?举个例子,我们有一个流水线A,并且使用这个流水线两次,创建两个新的流水线 B = A + 以及 C = A + 。
- 如果B和C都能成功完成,那么这个A是可重用的。
- 如果B和C都能成功完成,并且A的每一个阶段只被调用了一次,那么这个A是可缓存的。
可以看出,一个类如果是可缓存的,必然得是可重用的。
- CompletableFuture - 跟可重用的答案一样。
- Stream - 不能缓存中间操作的结果,除非调用了终端操作。
- Optional - ‘可缓存’,实际上,所有工作立即执行,并且做完后就保存了一个不变值,自然‘可缓存’。
- Observable, Flowable, Flux - 默认情况下是不可缓存的,但是你可以把一个这些类转变成缓存,只要调用.cache()就可以。示例:
Observable<Integer> work = Observable.fromCallable(() -> { System.out.println("Doing some work"); return 10; }); work.subscribe(System.out::println); work.map(i -> i * 2).subscribe(System.out::println);
输出:
Doing some work 10 Doing some work 20
如果用.cache():
Observable<Integer> work = Observable.fromCallable(() -> { System.out.println("Doing some work"); return 10; }).cache(); // <- apply caching work.subscribe(System.out::println); work.map(i -> i * 2).subscribe(System.out::println);
输出:
Doing some work 10 20
Push or Pull(推模式还是拉模式)
- Stream 和 Optional - 是拉模式的。你调用不同的方法(.get(), .collect() 等)从流水线拉取结果。拉模式经常与阻塞、同步是相关联的,而这也合理。你调用一个方法,然后线程等待数据。线程会阻塞直到数据到达。
- CompletableFuture, Observable, Flowable, Flux - 是推模式的。你订阅一个流水线,然后当有东西可以处理的时候你会得到通知。推模式通常意味着非阻塞、异步。当流水线在某个线程上执行的时候,你可以做任何事情。你已经定义了一段待执行的代码,作为下一个阶段的任务,当通知到达的时候,这个代码就会被执行。
Backpressure(反压)
要做到支持反压,流水线必须是推模式的。
Backpressure(反压) 描述的是在流水线中会发生的一种场景:某些异步的阶段处理速度跟不上,需要告诉上游生产者放慢速度。直接失败是不可接受的,因为会丢失太多数据。
- Stream & Optional - 不支持反压,因为他们是拉模式。
- CompletableFuture - 不需要面对这个问题,因为它只产生0个或者1个结果。
- Observable(RxJava 1), Flowable, Flux - 提供一组方案解决这个问题。常用的策略是:
- Buffering(缓冲) - 把所有的onNext的值保存到缓冲区,直到下游消费它们。
- Drop Recent - 如果下游处理跟不上的话,丢弃最近的onNext值。
- Use Latest - 如果下游处理跟不上的话,只提供最近的onNext值,之前的值会被覆盖。
- None - onNext事件直接被触发,不带任何缓冲或丢弃处理。
- Exception - 如果下游处理跟不上的话,触发一个异常。
- Observable(RxJava 2) - 不解决这个问题。很多RxJava 1的使用者用Observable来处理不适用反压的事件,或者是使用Observable的时候不用任何策略处理反压,这会导致不可预知的异常。所以,RxJava 2明确地区分两种情况,提供支持反压的Flowable和不支持反压的Observable。
Operator fusion(操作融合)
操作融合背后的想法是,在生命周期的不同点上,改变执行阶段的链条,从而消除库的架构因素所造成的额外开销。所有这些优化都是在内部处理掉的,对外部用户来说是透明的。
只有RxJava 2 和 Reactor 支持这个特性,但支持的方式不同。总的来说,有两种类型的优化:
- Macro-fusion - 用一个操作替换2个或更多的相继的操作
- Micro-fusion - 在一个输出队列中结束的操作,和在一个前驱队列中开始的操作,能够共用同一个队列的实例。比如说,与其调用request(1)然后处理onOnext(),我们可以:
订阅者可以向父observable轮询值。
更多的详细信息可以参考Operator-fusion (part 1) 和 Operator-fusion (part 2)
总结
上面的内容可以总结为一个表:
总的来说,Stream, CompletableFuture, 和 Optional创建出来是为了解决特定的问题。它们解决这些问题很好用。如果它们满足你的要求,你继续用它们就好了。
但是,不同的问题有不同的复杂性。某些问题需要新的技术。RxJava 和 Reactor是一组通用的工具,帮助你用一种声明式的方式解决你面对的问题,而不是用一些并非为这种问题而提供的工具,来创建一种“hack”的解决方案。
本文主要内容翻译自:http://alexsderkach.io/comparing-java-8-rxjava-reactor/

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
业务代码的救星——Java对象转换框架MapStruct
介绍 在业务项目的开发中,我们经常需要将Java对象进行转换,比如从外部HSF服务得到的对象转换为本域的业务对象domain object,将domain object转为数据持久层的data object,将domain object 转换为DTO以便返回给外部调用方等。在转换时大部分属性都是相同的,只有少部分的不同,如果手工编写转换代码,会很繁琐。这时我们可以通过一些对象转换框架来更方便的做这件事情。 这样的对象转换框架有不少,比较有名的有ModelMapper和MapStruct。它们所使用的实现技术不同,ModelMapper是基于反射的,通过反射来查找实体对象的字段,并读取或写入值,这样的方式实现原理简单,但性能很差。与ModelMapper框架不同的是,MapStruct是基于编译阶段代码生成的,生成的转换代码在运行的时候跟一般的代码一样,没有额外的性能损失。本文重点介绍MapStruct。 原理 使用方法 1. Mapper声明 通过类似下面的代码声明一个Mapper,MapStruct会在编译时自动生成实现代码。除了基本的功能外,MapStruct提供很多高级的功能,全...
- 下一篇
Jersey Java RESTful API on an Alibaba Cloud ECS Instance
By Aditya, Alibaba Cloud Tech Share Author. Tech Share is Alibaba Cloud's incentive program to encourage the sharing of technical knowledge and best practices within the cloud community. With the increased adoption of digital technology, we see that same applications are typically available across different devices, notably on laptops, mobile phones, and TV. Most of th
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程