RxJava简析
rxjava文档地址https://mcxiaoke.gitbooks.io/rxdocs/content/ 这个是中文版的
android studio 添加依赖 implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
首先,打印helloworld:
public void hello(String args){
Flowable.fromArray(args).subscribe(s -> System.out.println("hello " + s + "!"));
}
跟以前其他语言不大一样,看上去很麻烦,我们一步步来看
Flowable.fromArray(args)
这个方法最重要的就是里面的最后一句
new FlowableFromArray<>(items)
果然FlowableFromArray是Flowable的子类,所以真正的实现在子类里面
Flowable.fromArray(args).subscribe
subscribe进到里面的是
public final Disposable subscribe( Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
Objects.requireNonNull(onNext, "onNext is null");
Objects.requireNonNull(onError, "onError is null");
Objects.requireNonNull(onComplete, "onComplete is null");
LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
return ls;
}
看上去最重要的就是这两句了
LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
先进到subscribe(ls)中,发现这句
subscribeActual(flowableSubscriber)
跳进去发现是个抽象方法,那么实现肯定在子类啦,进到子类FlowableFromArray
public void subscribeActual(Subscriber<? super T> s) {
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new ArrayConditionalSubscription<>(
(ConditionalSubscriber<? super T>)s, array));
} else {
s.onSubscribe(new ArraySubscription<>(s, array));
}
}
跳进去又发现onSubscribe是个抽象方法,那么实现方法在哪呢,对啦,就是之前看到的LambdaSubscriber
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
}
这个onSubscribe.accept(this)跳过去就是接口Consumer的accept方法了
所以一开始的helloworld代码也可以改成
FlowableFromArray flowableFromArray = new FlowableFromArray(new String[]{args});
flowableFromArray.subscribe(new Consumer<String>() {
public void accept(String s) throws Throwable {
System.out.println("hello " + s + "!");
}
});
是不是很麻烦,饶了一大圈,没关系,我们继续往下看
这里给出一些名词的翻译
Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式
Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念
Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者
Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现
emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射
items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项
下面是常用的操作符列表:
创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
错误处理 Catch和Retry
辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
转换操作 To
连接操作 Connect, Publish, RefCount, Replay
反压操作,用于增加特殊的流程控制策略的操作符
下面我们来看第一个操作符:Create
Observable.create(new Observable.OnSubscribe<Integer>() {
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<Integer>() {
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
我们一起来看源码
首先是Observable的create方法
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
这里没什么,就是返回创建一个Observable对象,但是要注意里面的参数OnSubscribe
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
public interface Action1<T> extends Action {
void call(T t);
}
这个参数是一个接口,它的父类里有个抽象待实现的方法call,而且call方法被传了Subscriber进去
我们来看Subscriber这个类,原来是个接口,而且它的父类Observer有三个很重要的方法
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
第一个create方法算是完成了,我们可以拆分来看
Observable<Integer> integerObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
第二个方法subscribe,它的参数也是Subscriber,即intergerObservable.subscribe(Subscriber)
所以我们就看出来了,Observable这个被观察者先是通过call增加一系列的监听,然后通过subscribe订阅监听。这样,当call里的内容开始执行后,触发监听回调
下面我要放大招了,我把源码简化了一下
public interface MyOnSubscribe {
void call(MySubscriber subscriber);
}
public interface MySubscriber {
void onNext();
void onCompleted();
void onError();
}
public class MyObservable {
MyOnSubscribe onSubscribe;
public MyObservable(MyOnSubscribe onSubscribe) {
this.onSubscribe = onSubscribe;
}
public final static MyObservable create(MyOnSubscribe onSubscribe) {
return new MyObservable(onSubscribe);
}
public final void subscribe(MySubscriber subscriber) {
onSubscribe.call(subscriber);
}
}
测试代码
public void hello() {
MyObservable.create(new MyOnSubscribe() {
public void call(MySubscriber subscriber) {
try {
for (int i = 0; i < 5; i++) {
subscriber.onNext();
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError();
}
}
}).subscribe(new MySubscriber() {
public void onNext() {
System.out.println(1);
}
public void onCompleted() {
System.out.println("onCompleted");
}
public void onError() {
System.out.println("onError");
}
});
}
得到的结果是一样的。所以说,代码万变不离其中,只要灵活运用接口,接口就是用来监听的
第二个操作符from
Integer[] items = {0, 1, 2, 3, 4, 5};
Observable.from(items).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
先看Observable的from方法
public final static <T> Observable<T> from(T[] array) {
return from(Arrays.asList(array));
}
其实就是把数组转成list,但是再点from进去就很重要
public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(new OnSubscribeFromIterable<T>(iterable));
}public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}
OnSubscribeFromIterable是继承自OnSubscribe的,所以后面调的call方法,实际上是调的OnSubscribeFromIterable里的call方法,我们来看一下
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer<T>(o, it));
}
真相大白了,在这里做了迭代。还有一个操作符just,其实底层里面调的就是from,只不过还限制了参数个数,而且参数类型必须相同,感觉用处不大
第三个操作符repeat
Observable.just(1, 2).repeat(4).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
repeat点进去是OnSubcribRedo.repeat,紧追着count这个参数,会看到一个RedoFinite类
public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> {
private final long count;
public RedoFinite(long count) {
this.count = count;
}
public Observable<?> call(Observable<? extends Notification<?>> ts) {
return ts.map(new Func1<Notification<?>, Notification<?>>() {
int num=0;
public Notification<?> call(Notification<?> terminalNotification) {
if(count == 0) {
return terminalNotification;
}
num++;
if(num <= count) {
return Notification.createOnNext(num);
} else {
return terminalNotification;
}
}
}).dematerialize();
}
}
这里就看到了,有个num++和num<=count判断,就知道是怎么重复的了
第4个操作符Map和flapMap
这两个变换操作符可谓非常重要,经常用到,我写了4个例子,请仔细区别,就可以知道它们到底做了什么
Student student1 = new Student("stark", new Course[]{new Course("Chinese"), new Course("English")});
Student student2 = new Student("adam", new Course[]{new Course("Math"), new Course("Physical")});
Student[] students = new Student[]{student1, student2};
Observable.from(students).subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
System.out.println(student.getName());
}
});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String name) {
System.out.println(name);
}
});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, Course[]>() {
@Override
public Course[] call(Student student) {
return student.getCourses();
}
}).subscribe(new Action1<Course[]>() {
@Override
public void call(Course[] courses) {
System.out.println(courses[0].getName());
System.out.println(courses[1].getName());
}
});
System.out.println("-------------");
Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
}).subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
System.out.println(course.getName());
}
});
输出:
stark
adam
-------------
stark
adam
-------------
Chinese
English
Math
Physical
-------------
Chinese
English
Math
Physical
如果你仔细看代码,就会发现map就是一对一的转换,flatMap是一对多的转换,转换的前后类型在方法Func1中已经标的很清楚。例子:Func1(Student,String)就代表传参是Student,返回类型是String,具体的实现在call里面student.getName()
map和flatMap可以看作是将我们经常用到的嵌套循环for(i){for(j)...}...给解耦了,看起来更清楚一些,中间可以插入更多的操作
源码里面的实现就是迭代,没什么好说
第5个操作符filter:
Observable.just(1,2,3,4,5).filter(new Func1<Integer, Boolean>() {
public Boolean call(Integer integer) {
return integer<4;
}
}).subscribe(new Action1<Integer>() {
public void call(Integer integer) {
System.out.println(integer);
}
});
先过滤再循环输出
第6个组合操作符and/then/when
implementation 'io.reactivex:rxjava-joins:0.22.0'
Observable<String> just1 = Observable.just("A", "B");
Observable<Integer> just2 = Observable.just(1, 2, 3);
Pattern2<String, Integer> pattern = JoinObservable.from(just1).and(just2);
Plan0<String> plan = pattern.then(new Func2<String, Integer, String>() {
public String call(String s, Integer integer) {
return s + integer;
}
});
JoinObservable.when(plan).toObservable().subscribe(new Action1<String>() {
public void call(String s) {
System.out.println(s);
}
});
输出:
A1
B2
第7个组合操作符merge:
Observable<Integer> odds = Observable.just(1, 3, 5);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds,evens).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
输出:
1
3
5
2
4
6
第8个操作符doOnNext:
Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
public void call(Integer integer) {
if (integer > 1) {
throw new RuntimeException("item exceeds maximum value");
}
}
}).subscribe(new Subscriber<Integer>() {
public void onCompleted() {
System.out.println("onCompleted");
}
public void onError(Throwable e) {
System.out.println("onError");
}
public void onNext(Integer integer) {
System.out.println("next:" + integer);
}
});
输出:
next:1
onError
第9个操作符SubscribeOn(Scheduler):即申明在哪个调度器工作
第10个:android例子:
Observable.from(new String[]{"one", "two", "three", "four", "five"})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
public void call(String s) {
System.out.println(s);
}
});
大致了解了rxjava的使用和基本原理之后,在后续的使用中遇到不懂的再看文档https://mcxiaoke.gitbooks.io/rxdocs/content/,还有一定要看源码,然后自己亲自尝试,才能加深理解
欢迎关注我的微信公众号:安卓圈
本文分享自微信公众号 - 安卓圈(gh_df75572d44e4)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【翻译】CommonJS 是如何导致打包后体积增大的?
“ 原文链接:https://web.dev/commonjs-larger-bundles 今天的文章,将介绍什么是 CommonJS,以及它为什么会导致我们打包后的文件体积增大。 什么是 CommonJS? CommonJS 是 2009 年发布的 JavaScript模块化的一项标准,最初它只打算在浏览器之外的场景使用,主要用于服务器端的应用程序。 你可以使用 CommonJS 来定义模块,并从中导出部分模块。例如,下面的代码定义了一个模块,该模块导出了五个函数:add、 subtract、 multiply、 divide、max: //utils.jsconst{maxBy}=require('lodash-es');constfns={add:(a,b)=>a+b,subtract:(a,b)=>a-b,multiply:(a,b)=>a*b,divide:(a,b)=>a/b,max:arr=>maxBy(arr)};Object.keys(fns).forEach(fnName=>module.exports[fnName]=fns[...
- 下一篇
Rainbond ServiceMesh架构组件端口冲突处理方式
在我们部署具有多个服务的分布式业务时,必须要考虑的一点就是如何处理服务之间的通信问题,那么当我们将业务部署到Rainbond 上时,又是如何去处理的呢? Rainbond 开箱即用的ServiceMesh架构默认通过 Sidecar 代理的方式,为我们透明的解决了分布式场景下组件间的通讯问题。 例如A组件需要访问B组件,可以让A组件依赖B组件,这样A组件启动时会同时以插件方式启动一个 envoy 服务,而 envoy 服务会将B组件的对内端口映射到A组件 Pod 网络空间的本地回环地址127.0.0.1的相同端口,也就是说B组件开通了对内的8080端口,那么在建立了A到B的依赖关系后,在A组件内访问127.0.0.1:8080会由 envoy 将相关请求转发到B组件的8080端口。 但是我们实际的业务中经常会出现一种情况,那就是一个组件需要和多个其他组件通信,而这些组件使用的服务端口有可能会相同,这就会导致 envoy 在本地回环地址127.0.0.1起监听时出现端口冲突。 我们可以通过以下两种方式解决这个问题: 方式一:通过HTTP 7层网络治理进行端口复用 这一类型的组件,通过Ra...
相关文章
文章评论
共有0条评论来说两句吧...