Android:随笔——RxJava的线程切换
近期用到 RxJava ,线程切换的时候出了点小插曲,首先先上理论,在上实践,不喜理论可跳过,此篇文章适合会使用 RxJava 的人群,如果还没有接触过可以自学过后再来读这篇文章,这篇文章这几个例子其实代码都是基本都是一样的,我也不知道这样写是不是更清晰
理论
总所周知 RxJava 在切换线程时用到了两个方法 subscribeOn()
和 observeOn()
下面来分别解释一下这两个方法
- subscribeOn() : 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
- observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;
我之前还看到有人说 subscribeOn()
必须在 observeOn()
的前面,不过经过我测试他两个的位置并没有什么联系,就如上面所说 第一次出现 subscribeOn()
的地方是有效的,其他的无效
实践
实践一下,先来个基本的栗子
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===create: " + Thread.currentThread().getName()); subscriber.onNext("1"); } }) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName()); return Integer.valueOf(s); } }) .flatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer integer) { Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName()); return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName()); for (int i = 0; i < integer; i++) { subscriber.onNext(i + ""); } subscriber.onCompleted(); } }); } }) .map(new Func1<String, Long>() { @Override public Long call(String s) { Log.e(TAG, "===String->Long: " + Thread.currentThread().getName()); return Long.parseLong(s); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { Log.e(TAG, "===onNext: " + Thread.currentThread().getName()); } });
这个例子呢就是简单的将 String 转为 Integer 然后转换发射源 发射 String 在然后 将 String 转换为 Long 然后打印
这个例子呢没有什么实际意义,只作为个示例
有没有看到
.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
这两句代码 我切换了一下线程 ,接下来看一下我的运行结果
===create: RxIoScheduler-2 ===String -> Integer: RxIoScheduler-2 ===Integer->Observable: RxIoScheduler-2 ===Observable<String> call: RxIoScheduler-2 ===String->Long: RxIoScheduler-2 ===onNext: main
接下来解释一下,因为subscribeOn(Schedulers.io())
它指定了最开始的被观察者所在的线程所以后面的操作都是根据最开始的被观察者制定的线程运行的,又因为 .observeOn(AndroidSchedulers.mainThread())
它指定了都面的操作符使用主线程运行。
接下来再写一个多 subscribeOn()
observeOn()
的情况
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===create: " + Thread.currentThread().getName()); subscriber.onNext("1"); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.io()) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName()); return Integer.valueOf(s); } }) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer integer) { Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName()); return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName()); for (int i = 0; i < integer; i++) { subscriber.onNext(i + ""); } subscriber.onCompleted(); } }); } }) .observeOn(Schedulers.io()) .map(new Func1<String, Long>() { @Override public Long call(String s) { Log.e(TAG, "===String->Long: " + Thread.currentThread().getName()); return Long.parseLong(s); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { Log.e(TAG, "===onNext: " + Thread.currentThread().getName()); } });
运行结果
===create: main ===String -> Integer: RxIoScheduler-4 ===Integer->Observable: main ===Observable<String> call: main ===String->Long: RxIoScheduler-3 ===onNext: main
下面应该不用我解释就能知道结果为什么是这样的
简单解释一下 因为有两个subscribeOn()
所以取第一个,所以最开始的被观察者所在的线程为主线程,接着使用observeOn()
使用制定后面的操作符为 io 线程,接着observeOn()
又指定后面的操作符为主线程...以此类推不再赘述
接下来到了我要说的重点了也是我受到迷惑的地方,我就简单的写一段栗子来重现一下,直接上代码
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===create: " + Thread.currentThread().getName()); subscriber.onNext("1"); } }) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName()); return Integer.valueOf(s); } }) .flatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer integer) { Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName()); return forEach(integer); } }) .map(new Func1<String, Long>() { @Override public Long call(String s) { Log.e(TAG, "===String->Long: " + Thread.currentThread().getName()); return Long.parseLong(s); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { Log.e(TAG, "===onNext: " + Thread.currentThread().getName()); } });
可以看到 flatMap
那个操作符哪里使用了一个方法 forEach(int)
代码如下
public Observable<String> forEach(final int integer) { return Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName()); for (int i = 0; i < integer; i++) { subscriber.onNext(i + ""); } subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
再贴一下结果
===create: RxIoScheduler-2 ===String -> Integer: RxIoScheduler-2 ===Integer->Observable: RxIoScheduler-2 ===Observable<String> call: RxIoScheduler-3 ===String->Long: main ===onNext: main
解释一下结果:我想大家肯定发现了端倪,为什么 String -> Long 这个步骤为什么会在主线程里运行呢,原因呢很简单 flatMap 变换被监听者,这个被监听者使用observeOn
切换了后边操作符的线程,影响到了 flatMap 后面的 map 操作符所以导致了如此结果
紧接着我们在看一个例子
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===create: " + Thread.currentThread().getName()); subscriber.onNext("1"); } }) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName()); return Integer.valueOf(s); } }) .flatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer integer) { Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName()); return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) { forEach(integer, new CallBack() { @Override public void call(String s) { Log.e(TAG, "===Subscriber: " + Thread.currentThread().getName()); subscriber.onNext(s); } }); } }); } }) .map(new Func1<String, Long>() { @Override public Long call(String s) { Log.e(TAG, "===String->Long: " + Thread.currentThread().getName()); return Long.parseLong(s); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { Log.e(TAG, "===onNext: " + Thread.currentThread().getName()); } });
public void forEach(final int integer, final CallBack back) { Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { Log.e(TAG, "===Observable<String> call: " + Thread.currentThread() .getName()); for (int i = 0; i < integer; i++) { subscriber.onNext(i + ""); } subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { back.call(s); } }); } interface CallBack { void call(String s); }
结果
===create: RxIoScheduler-4 ===String -> Integer: RxIoScheduler-4 ===Integer->Observable: RxIoScheduler-4 ===Observable<String> call: RxIoScheduler-5 ===Subscriber: main ===String->Long: main ===onNext: main
大家发现的现在还是 和上一个例子相仿,主要的改变还是在 flatMap()
这个方法里,现在我的做法是创建一个被监听者,然后里面是调用forEach()
等待接口返回值,再往下发射数据。
注意以下几点:
- 在
forEach()
方法里面切换了线程,这个回调接口的线程为 主线程 - 我们在
flatMap()
这里新建了一个发射源(被监听者)、在这里我们并没有指定它的线程,所以flatMap()
是什么线程,这个被监听者就是什么线程
看清以上两点,我们分析一下,flatMap()
以上的结果大家肯定都能想出来,我们就直接从flatMap()
开始分析了,都知道了回调的接口里面的线程为主线程,那么它作为被观察者它下面的操作符按照 RxJava 线程切换的基本原理来说,肯定也是主线程。
上个图理解一下
所以大家如果像我这样使用 flatMap()
的时候一定注意下面操作符的线程
总结
今天去调一个 Js 的方法,然后有一个接口回调,无意中看见一个 名字奇怪的 线程名称,一想 RxJava 的 io 线程名 应该是 RxIoScheduler 开头的啊,突然有点蒙,感觉自己一点都不了解 RxJava 的线程 切换了,结果分析一波 也是万变不离其宗,还是挺有意思的一个小插曲
欢迎各位前来拍砖

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Bezier曲线在Android动画中的应用
Android动画的开发中,为了达到更加酷炫的效果,常常需要自定义运动轨迹,或者绘制花式复杂的曲线,这正是Bezier曲线大显神通的地方,本文将带你了解Bezier曲线在Android开发中的一些应用。 1. Bezier曲线简介 贝塞尔曲线(Bézier curve),又称贝兹曲线或贝济埃曲线,是应用于二维图形应用程序的数学曲线。一般的矢量图形软件通过它来精确画出曲线,贝兹曲线由线段与节点组成,节点是可拖动的支点,线段像可伸缩的皮筋,我们在绘图工具上看到的钢笔工具就是来做这种矢量曲线的。主要结构:起始点、终止点(也称锚点)、控制点。通过调整控制点,贝塞尔曲线的形状会发生变化。 根据控制点数目的不同,Bezier曲线可以分为很多种,控制点越多曲线就更复杂,一般常用的是二阶和三阶Bezier曲线,Bezier曲线的具体信息可以参看百科:Constructing Bézier curves 1.1 一阶Bezier曲线 为了便于理解,我们先从一阶Bezier曲线开始。由两个点控制的曲线,也就是一条直线. 一阶Bezier曲线 1.2 二阶Bezier曲线 由三个点控制的曲线。P0是起点,P...
- 下一篇
.Net 转战 Android 4.4 日常笔记(1)--工具及环境搭建
闲来没事做,还是想再学习一门新的技术,无论何时Android开发比Web的开发工资应该高40%,我也建议大家面对移动开发,我比较喜欢学习最新版本的,我有java的基础,但是年久,已经淡忘,以零基础学习,希望没有很多的语言闲话,爽快进入Android开发,只留下一个日常学习笔记,并不发布主页,相信这块在cnblog也是没人看的,如果你有兴趣,我们将共同学习,现在进入学习... 一、工欲善其事 必先利其器 先安装JDK7版本 下载集成开发工具ADT Bundle for Windows 二、配置系统变量 新建系统变量SDK_HOME 值:F:\Android\adt-bundle-windows-x86_64-20140321\sdk (值为ADT Bundle for Windows解压后的SDK目录) 修改系统变量path的值 在后面加入;%SDK_HOME%\tools;%SDK_HOME%\platform-tools; 三、运行Eclipse配置Android Virtual Devices Manager的模拟器,用来运行开发的App 四、启动配置的Android模拟器 ...
相关文章
文章评论
共有0条评论来说两句吧...