RxJava2 入门详细笔记(2)
六、过滤操作符
6.1、filter()
通过一定逻辑来过滤被观察者发送的事件,如果返回 true
则会发送事件,否则不会发送
Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 2 == 0; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 2 10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 4
6.2、ofType()
过滤不符合该类型的事件
Observable.just(1, 2, "Hi", 3, 4, "Hello").ofType(Integer.class).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 2 10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 3 10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 4
6.3、skip()
以正序跳过指定数量的事件
Observable.just(1, 2, 3, 4).skip(2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 3 10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 4
6.4、skipLast()
以反序跳过指定数量的事件
Observable.just(1, 2, 3, 4).skipLast(2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 2
6.5、distinct()
过滤事件序列中的重复事件
Observable.just(1, 2, 1, 2, 3, 4, 3).distinct().subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 2 10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 3 10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 4
6.6、distinctUntilChanged()
过滤掉连续重复的事件
Observable.just(1, 2, 2, 1, 3, 4, 3, 3).distinctUntilChanged().subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 08:04:44.531 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 2 10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1 10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3 10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 4 10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3
6.7、take()
控制观察者接收事件的数量
Observable.just(1, 2, 2, 1, 3, 4, 3, 3).take(3).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 1 10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2 10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2
6.8、debounce()
如果两个事件发送的时间间隔小于设定的时间间隔,则前一件事件不会发送给观察者
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); Thread.sleep(900); emitter.onNext(2); } }).debounce(1, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
10-06 08:08:59.337 13509-13523/leavesc.hello.rxjavademo E/MainActivity: accept : 2
6.9、firstElement() && lastElement()
firstElement()
取事件序列的第一个元素,lastElement()
取事件序列的最后一个元素
Observable.just(1, 2, 3, 4, 5).firstElement().subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
6.10、elementAt() & elementAtOrError()
elementAt()
可以指定取出事件序列中事件,但是输入的 index
超出事件序列的总数的话就不会触发任何调用,想触发异常信息的话就用 elementAtOrError()
Observable.just(1, 2, 3, 4, 5).elementAt(5).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
以上代码不会触发任何
改用为 elementAtOrError()
,则会抛出异常
Observable.just(1, 2, 3, 4, 5).elementAtOrError(5).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept : " + integer); } });
Process: leavesc.hello.rxjavademo, PID: 13948 io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | null at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:46) at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115) at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:111) at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:37) at io.reactivex.Observable.subscribe(Observable.java:12090) at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37) at io.reactivex.Single.subscribe(Single.java:3438) at io.reactivex.Single.subscribe(Single.java:3424)
七、条件操作符
7.1、all()
判断事件序列是否全部满足某个事件,如果都满足则返回 true
,反之则返回 false
Observable.just(1, 2, 3, 4, 5).all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 2 == 0; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.e(TAG, "accept: " + aBoolean); } });
10-06 08:16:10.212 14043-14043/leavesc.hello.rxjavademo E/MainActivity: accept: false
7.2、takeWhile()
发射原始 Observable
,直到指定的某个条件不成立的那一刻,它停止发射原始 Observable
,并终止自己的 Observable
Observable.just(1, 2, 3, 4, 5, 1, 2).takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 4; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept: " + integer); } });
10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 1 10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 2 10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 3
7.3、skipWhile()
订阅原始的 Observable
,但是忽略它的发射物,直到指定的某个条件变为 false 时才开始发射原始 Observable
Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5) .skipWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 3; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "integer " + integer); } });
10-06 13:59:40.583 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4 10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1 10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 3 10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4 10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5 10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1 10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5
7.4、takeUntil()
用于设置一个条件,当事件满足此条件时,此事件会被发送,但之后的事件就不会被发送了
Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5) .takeUntil(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 3; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "integer " + integer); } });
10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 1 10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 2 10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 4
7.5、skipUntil()
当 skipUntil()
中的 Observable
发送事件了,原始的 Observable
才会发送事件给观察者
Observable.intervalRange(1, 6, 0, 1, TimeUnit.SECONDS) .skipUntil(Observable.intervalRange(10, 3, 1, 1, TimeUnit.SECONDS)) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onNext(Long along) { Log.e(TAG, "onNext : " + along); } @Override public void onError(Throwable e) { Log.e(TAG, "onError"); } @Override public void onComplete() { Log.e(TAG, "onComplete"); } });
10-06 08:51:16.926 16877-16877/leavesc.hello.rxjavademo E/MainActivity: onSubscribe 10-06 08:51:17.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 2 10-06 08:51:18.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 3 10-06 08:51:19.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 4 10-06 08:51:20.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 5 10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 6 10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onComplete
7.6、sequenceEqual()
判断两个 Observable
发送的事件是否相同,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射 true,否则发射 false
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3)) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.e(TAG, "accept aBoolean : " + aBoolean); } });
10-06 08:46:59.369 16492-16492/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true
7.7、contains()
判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false
Observable.just(1, 2, 3, 4).contains(2).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.e(TAG, "accept aBoolean : " + aBoolean); } });
10-06 08:45:58.100 16386-16386/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true
7.8、isEmpty()
判断事件序列是否为空
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).isEmpty().subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.e(TAG, "accept aBoolean: " + aBoolean); } });
10-06 08:43:43.201 16278-16278/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean: true
7.9、amb()
amb()
接收一个 Observable
集合,但是只会发送最先发送事件的 Observable
中的事件,不管发射的是一项数据还是一个 onError
或 onCompleted
通知,其余 Observable
将会被丢弃
List<Observable<Long>> list = new ArrayList<>(); list.add(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS)); list.add(Observable.intervalRange(10, 3, 0, 1, TimeUnit.SECONDS)); Observable.amb(list).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(TAG, "accept: " + aLong); } });
10-06 08:41:45.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 10 10-06 08:41:46.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 11 10-06 08:41:47.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 12
7.10、defaultIfEmpty()
如果 Observable 没有发射任何值,则可以利用这个方法发送一个默认值
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).defaultIfEmpty(100).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "accept: " + integer); } });
10-06 08:40:04.754 15945-15945/leavesc.hello.rxjavademo E/MainActivity: accept: 100
更多的学习笔记看这里:Java_Android_Learn
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
C++雾中风景12:聊聊C++中的Mutex,以及拯救生产力的Boost
笔者近期在工作之中编程实现一个Cache结构的封装,需要使用到C++之中的互斥量Mutex,于是花了一些时间进行了调研。(结果对C++标准库很是绝望....)最终还是通过利用了Boost库的shared_mutex解决了问题。借这个机会来聊聊在C++之中的多线程编程的一些“坑”。 1.C++多线程编程的困扰 C++从11开始在标准库之中引入了线程库来进行多线程编程,在之前的版本需要依托操作系统本身提供的线程库来进行多线程的编程。(其实本身就是在标准库之上对底层的操作系统多线程API统一进行了封装,笔者本科时进行操作系统实验是就是使用的pthread或来进行多线程编程的)提供了统一的多线程固然是好事,但是标准库给的支持实在是有限,具体实践起来还是让人挺困扰的: C++本身的STL并不是线程安全的。所以缺少了类似与Java并发库所提供的一些高性能的线程安全的数据结构。(Doug Lea大神亲自操刀完成的并发编程库,让JDK5成为Java之中里程碑式的版本) 如果没有线程安全的数据结构,退而求其次,可以自己利用互斥量Mutex来实现。C++的标准库支持如下的互斥量的实现: 互斥量 版本 作用...
- 下一篇
Java 获取指定主机IP
import java.net.InetAddress; import java.net.UnknownHostException; public class GetIP { public static void main(String[] args) { InetAddress address = null; try { address = InetAddress.getByName("www.baidu.com"); } catch (UnknownHostException e) { e.printStackTrace(); } System.out.println(address.getHostName()+":"+address.getHostAddress()); System.exit(0); } }
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- Hadoop3单机部署,实现最简伪集群
- CentOS8编译安装MySQL8.0.19
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Mario游戏-低调大师作品