Rxjava深入理解之自己动手编写Rxjava
Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看.
从观察者模式说起
观察者模式,是我们在平时使用的比较多的一种设计模式.
观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己. 它的特点是 一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。
本文的重点不是在介绍 观察者模式,因此这个话题就不展开讨论.
Rxjava 是一种 函数式编程,或者称为链式调用模型,也是使用观察者模式来实现事件的传递与监听.
下面我们来看,Rxjava 和 普通观察者的点区别.
- 普通的观察者模式通常是 一个主题,多个观察者配合,基本上是属于 一对多的情况.
- Rxjava的观察者模式通常是 一对一的关系.
- 普通的观察者模式是 主题数据改变时,通知观察者数据的变动
- Rxjava的观察者模式是 在被观察者(主题)调用
subscribe
方法后,触发数据流动和观察者接收事件.
基础知识介绍到这里,接下来我们来自己动手编写Rxjava
编写Rxjava
看一个常见的rxjava的使用示例,(原始数据,数据转换,线程切换,数据接收处理一系列功能):
public static void main(String[] args) throws InterruptedException { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }) .observeOn(Schedulers.io()) .map(s -> Integer.parseInt(s) * 10) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(1); } // 10 // 20
接下来,我会一步一步带领大家实现上述所有的功能.
一个简单的观察者模式
// Observer.java // 观察者 public interface Observer<T> { void onUpdate(T t); } // ObservableSource.java // 被观察者(主题)接口 public interface ObservableSource<T> { void subscribe(Observer<? super T> observer); } // Observable.java // 具体的被观察者(主题) public class Observable<T> implements ObservableSource<T> { private T t; public Observable(T t) { this.t = t; } @Override public void subscribe(Observer<? super T> observer) { // 调用订阅时,触发观察者更新 observer.onUpdate(t); } }
使用:
public static void main(String[] args) { // 观察者 Observer<String> observer = s -> System.out.println(s); // 被观察者(主题) Observable<String> observable = new Observable<>("hello"); // 调用 observable.subscribe(observer); } // hello
这样,算是一个简单的观察模式了,但是这种方式很不灵活,数据在构造中直接传入了.
接下来我们来改造一下 Observable.java
类. 可以传入一个接口来定义数据的传递规则,并且为Observable
写一个适配器和一个事件分发器,为原始事件的产生提供支持.
- 添加
Emitter
接口,它是一个事件分发的接口; - 添加
ObservableOnSubscribe
接口,它是创建Observable
实例的桥梁,并且有生产事件的功能,支持lambda方式调用; - 添加
ObservableCreate
类,它是Observable
的适配器,能够根据ObservableOnSubscribe
接口,快速创建一个Observable
实例;并且内部类CreateEmitter
实现了Emitter
接口,用于事件的分发; - 修改
Observable
类,添加工厂方法,能够根据ObservableOnSubscribe
接口,快速构建Observable
实例;
// Emitter.java // 事件分发器接口 public interface Emitter<T> { void onUpdate(T t); } // ObservableOnSubscribe.java // Observable的事件分发接口 public interface ObservableOnSubscribe<T> { void subscribe(Emitter<T> emitter) throws Exception; } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { // 工厂方法,生产出一个Observable实例 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return new ObservableCreate<>(source); } // 真正进行事件分发处理的方法 abstract void subscribeActual(Observer<? super T> observer) throws Exception; @Override // 交给subscribeActual实现,需要子类实现 public void subscribe(Observer<? super T> observer) throws Exception { subscribeActual(observer); } } // ObservableCreate.java // Observable的一个适配器,用于快速创建一个可以发送事件的Observable class ObservableCreate<T> extends Observable<T> { // 事件分发接口 private final ObservableOnSubscribe<T> source; ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override // 分发逻辑的具体代码 void subscribeActual(Observer<? super T> observer) throws Exception { CreateEmitter<T> emitter = new CreateEmitter<>(observer); source.subscribe(emitter); } // 内部分发器 static class CreateEmitter<T> implements Emitter<T> { private final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override // 这里只是简单的将observer观察者的事件直接分发出去 public void onUpdate(T t) { observer.onUpdate(t); } } }
ObservableSource.java
和Observer.java
没有修改,固没有贴出,未贴出代码请查看上一步.使用 :
public static void main(String[] args) throws Exception { Observable.create(emitter -> { emitter.onUpdate("hello"); emitter.onUpdate("world"); }) .subscribe(System.out::println); } // hello // world
哇! 你会发现,到此为止,你已经使用观察模式实现了一个简易的函数式编程的代码了.
如果你完全理解了上述代码是怎么产生的,那么恭喜你,你已经理解rxjava最最核心的原理了.
添加事件结束和异常捕获
上诉的代码,还不能够捕获异常和结束事件,这样使用起来很不方便,接下来我们来改造实现它.
仿造rxjava,我们也将事件分为onNext
,onError
,onComplete
三个事件.
需要修改 分发器接口和观察者接口,以及Observable
的适配器.
- 修改
Observer
接口和Emitter
接口, 改为onNext
,onError
,onComplete
方法; - 修改
ObservableCreate
类,添加异常处理和结束的逻辑;
// Observer.java public interface Observer<T> { void onNext(T t); void onError(Throwable e); void onComplete(); } // Emitter.java public interface Emitter<T> { void onNext(T t); void onError(Throwable e); void onComplete(); } // ObservableCreate.java class ObservableCreate<T> extends Observable<T> { // 事件分发接口 private final ObservableOnSubscribe<T> source; ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override // 分发逻辑代码 void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> emitter = new CreateEmitter<>(observer); try { source.subscribe(emitter); } catch (Exception e) { // 异常接收和处理 emitter.onError(e); } } // 内部分发器 static class CreateEmitter<T> implements Emitter<T> { private final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { observer.onNext(t); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } } }
修改过这是三个类后,我们就能接收异常和结束了.使用:
public static void main(String[] args) { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }) .subscribe(new Observer<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("end..."); } }); } // 1 // 2 // end...
嗯!? 虽然实现了接收异常和结束的功能,但是有时我们只需要onNext
事件时,这样的代码写起来不够优雅.
接下来我们编写一个观察者的适配器,让它能够使用 lambda
表达式来优雅的编写代码.
- 添加
Consumer
接口,它是接收一个参数,无返回值的接口,用途是进行lambda方式进行参数传递; - 添加
Action
接口,它是一个不接受参数,无返回值和的接口,用途也是进行lambda方式进行参数传递; - 添加
Functions
类,它是一个辅助类,能获取空Consumer
和空Action
实现; - 添加
LambdaObserver
类,它会将lambda参数形式,转化为Observer
实例,进而实现lambda式的调用; - 修改
Observable
类, 添加void subscribe(Consumer<? super T> next, Action complete, Consumer<? super Throwable> error)
系列的方法,让subscribe
方法真正支持 lambda式调用.
// Consumer.java // 接受一个参数,无返回的接口 public interface Consumer<T> { void apply(T t) throws Exception; } // Action.java // 不接受参数,无返回的接口 public interface Action { void apply() throws Exception; } // Functions.java public class Functions { public static final Action EMPTY_ACTION = () -> {}; public static <T> Consumer<T> emptyConsumer() { return t -> {}; } } // LambdaObserver.java public class LambdaObserver<T> implements Observer<T> { private final Consumer<? super T> onNext; private final Consumer<? super Throwable> onError; private final Action onComplete; public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) { this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; } @Override public void onNext(T t) { try { onNext.apply(t); } catch (Exception e) { onError(e); } } @Override public void onError(Throwable e) { try { onError.apply(e); } catch (Exception e1) { e1.printStackTrace(); } } @Override public void onComplete() { try { onComplete.apply(); } catch (Exception e) { e.printStackTrace(); } } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { // 工厂方法,生产出一个Observable实例 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return new ObservableCreate<>(source); } // 真正进行事件分发处理的方法 abstract void subscribeActual(Observer<? super T> observer); @Override // 交给subscribeActual实现,需要子类实现 public void subscribe(Observer<? super T> observer) { subscribeActual(observer); } // 接受3个lambda表达式的方法参数 public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) { LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete); subscribe(lambdaObserver); } // 接受2个lambda表达式的方法参数 public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) { subscribe(next, error, Functions.EMPTY_ACTION); } // 接受1个lambda表达式的方法参数 public void subscribe(Consumer<? super T> next) { subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION); } }
接下来是调用:
public static void main(String[] args) { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }).subscribe(System.out::println); } // 1 // 2
啧啧!?有点激动了,已经看起来有模有样了,但是功能远远还不够.
添加 是否消费事件的功能(中断事件功能)
在上诉的代码中,我们无法主动中断整个事件发生的过程.接下来我们就需要编写 Disposable
来实现onComplete
和onError
的自中断,以及主动取消事件.
Rxjava中,Disposable
是使用 枚举类型加上原子引用(AtomicReference
)类来实现线程安全(具体可查看DisposableHelper
类).这种方式比较繁琐,这里就不用这种方式来演示,而使用 volatile
声明的状态变量来做同步安全.
- 添加
Disposable
接口,提供中断和是否中断的方法; - 修改
Observer
接口, 添加onSubscribe
方法,让观察者可以在事件传递前,获取Disposable
,进而可以在事件传递的任意阶段中断事件; - 修改
ObservableCreate
类,添加观察者回调onSubscribe
,此回调需在事件分发前才能起到作用;内部类CreateEmitter
实现Disposable
接口,在事件分发前先判断是否被中断了;使用volatile
变量实现中断判断; - 修改
LambdaObserver
类,让它实现Disposable
接口,添加是否中断的判断; - 修改
Observable
类,添加onSubscribe
lambda调用;以及返回Disposable
实例;
// Disposable.java public interface Disposable { void dispose(); boolean isDisposed(); } // Observer.java public interface Observer<T> { void onSubscribe(Disposable d); void onNext(T t); void onError(Throwable e); void onComplete(); } // ObservableCreate.java // Observable的一个适配器,用于快速创建一个可以发送事件的Observable final class ObservableCreate<T> extends Observable<T> { // 事件分发接口 private final ObservableOnSubscribe<T> source; ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override // 分发逻辑代码 void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> emitter = new CreateEmitter<>(observer); // 将中断器回调给observer observer.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Exception e) { // 异常接收和处理 emitter.onError(e); } } // 内部分发器 static class CreateEmitter<T> implements Emitter<T>, Disposable { private final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { // 如果事件没被消费,则进行操作 if (!isDisposed()) observer.onNext(t); } @Override public void onError(Throwable e) { if (!isDisposed()) { try { observer.onError(e); } finally { // 触发消费,后续不再处理事件 dispose(); } } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { // 触发消费,后续不再处理事件 dispose(); } } } private volatile boolean isDisposed = false; @Override public void dispose() { isDisposed = true; } @Override public boolean isDisposed() { return isDisposed; } } } // LambdaObserver.java public class LambdaObserver<T> implements Observer<T>, Disposable { private final Consumer<? super T> onNext; private final Consumer<? super Throwable> onError; private final Action onComplete; private final Consumer<? super Disposable> onSubscribe; public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; this.onSubscribe = onSubscribe; } @Override public void onSubscribe(Disposable d) { try { onSubscribe.apply(d); } catch (Exception e) { onError(e); } } @Override public void onNext(T t) { if (!isDisposed()) try { onNext.apply(t); } catch (Exception e) { onError(e); } } @Override public void onError(Throwable e) { if (!isDisposed()) try { onError.apply(e); } catch (Exception e1) { e1.printStackTrace(); } finally { dispose(); } } @Override public void onComplete() { if (!isDisposed()) try { onComplete.apply(); } catch (Exception e) { e.printStackTrace(); } finally { dispose(); } } private volatile boolean isDisposed = false; @Override public void dispose() { isDisposed = true; } @Override public boolean isDisposed() { return isDisposed; } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { // 工厂方法,生产出一个Observable实例 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return new ObservableCreate<>(source); } // 真正进行事件分发处理的方法 abstract void subscribeActual(Observer<? super T> observer); @Override // 交给subscribeActual实现,需要子类实现 public void subscribe(Observer<? super T> observer) { subscribeActual(observer); } // 接受4个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete, Consumer<? super Disposable> onSubscribe) { LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete, onSubscribe); subscribe(lambdaObserver); return lambdaObserver; } // 接受3个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) { return subscribe(next, error, complete, Functions.emptyConsumer()); } // 接受2个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) { return subscribe(next, error, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } // 接受1个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next) { return subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.emptyConsumer()); } }
到此,我们就能够控制事件的中断了.我们来看使用:
private Disposable mDisposable = null; void disposableTest() { Observable.create((ObservableOnSubscribe<Integer>) emitter -> { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }) .subscribe(integer -> { System.out.println(integer); // 3 事件将不再传递和接收 if (integer == 2 && mDisposable != null) mDisposable.dispose(); }, Functions.emptyConsumer(), Functions.EMPTY_ACTION, d -> mDisposable = d); } // 1 // 2 // 这种方式只在,异步的情况下使用,由于示例中目前还不支持异步,因此以下代码起不了作用. void disposableTest2() { Disposable disposable = Observable .create((ObservableOnSubscribe<Integer>) emitter -> { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }) .subscribe(System.out::println); disposable.dispose(); }
像这样,在事件分发前,拿到Disposable
对象,这样你能在任意阶段中断 这个过程.
至此, 我们实现了基本的事件发送和lambda调用,以及中断功能.接下来我们需要开始添加 操作符了, 让它真的能达到函数式调用的模样!
添加 操作符(Map)
操作符的重点是, 你需要处理好上游传递下来的Disposable
对象,以及下游待传递的Observer
.
下面我们来实现map
操作符的功能, 它可以将一个类型转化为另一个类型.
- 添加
Function
接口,该接口接收一个类型的参数,并且返回另一个类型的值; - 添加
BasicObserver
类,该类实现Observer
和Disposable
接口,用于传递上下游的数据; - 添加
ObservableMap
类,该类继承Observable
,并且使用Function
接口,实现类型转换;内部类MapObserver
继承自BasicObserver
实现具体转换逻辑; - 修改
Observable
类,添加map
操作符;
// Function.java // 接收一个类型的参数,返回一个类型 public interface Function<T, R> { R apply(T t) throws Exception; } // BasicObserver.java public abstract class BasicObserver<T, R> implements Observer<T>, Disposable { // 上游传递的Disposable对象 private Disposable upstream; // 下游接收的观察者对象 final Observer<? super R> downstream; // 如果已经中断,则无需下传 boolean done; BasicObserver(Observer<? super R> downstream) { this.downstream = downstream; } @Override public void onSubscribe(Disposable d) { // 接收上游的Disposable this.upstream = d; downstream.onSubscribe(this); } @Override public void onError(Throwable e) { if (done) return; done = true; downstream.onError(e); } @Override public void onComplete() { if (done) return; done = true; downstream.onComplete(); } @Override public void dispose() { upstream.dispose(); } @Override public boolean isDisposed() { return upstream.isDisposed(); } } // ObservableMap.java final class ObservableMap<T, U> extends Observable<U> { private final ObservableSource<T> source; private final Function<? super T, ? extends U> function; ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { this.source = source; this.function = function; } @Override void subscribeActual(Observer<? super U> observer) { source.subscribe(new MapObserver<T, U>(observer, function)); } static class MapObserver<T, U> extends BasicObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) return; try { // 这里实现具体的转化,下游接收到转化后的类型变量 downstream.onNext(mapper.apply(t)); } catch (Exception e) { onError(e); } } } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { ... // map操作符 public <R> Observable<R> map(Function<? super T, ? extends R> mapper) { return new ObservableMap<>(this, mapper); } }
实现好了map
功能,我们来验证一下:
void mapTest() { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }) .map(s -> Integer.parseInt(s) * 10) .subscribe(System.out::println); } // 10 // 20
哇!! 有点不可思议,已经做到这一步了,我们还差什么呢? 没错,就是线程切换!!
PS : Rxjava中有很多的操作符, 我用其中比较典型的
map
来做示范,其他操作符,有兴趣的可以自己手动来实现.
线程切换
rxjava中,自己实现了一套功能强大的线程池.配合操作符 observeOn
,subscribeOn
来进行线程切换.这里就不对其进行展开,我们的重点是自己实现.
由于rxjava的线程池调度相当的复杂, 这里为了方便演示,将只采用jdk自带的线程池来做线程调度.下面我们来实现, rxjava中 observeOn
操作符,以及Schedulers.io()
的线程调度.
- 添加
Scheduler
接口,定义了调度的方法,提交任务,移除任务,停止线程池; - 添加
IOScheduler
类,是Scheduler
的具体实现,采用newCachedThreadPool
提交任务和中断任务; - 添加
ObservableObserveOn
类,继承Observable
,为observeOn
操作符提供支持;内部类ObserveOnObserver
,实现Observer
,Disposable
和Runnable
接口,run
方法将拦截所有事件,将其作为任务提交给线程池运行,达到异步的效果; - 修改
Observable
类,添加observeOn
操作符;
// Scheduler.java public interface Scheduler { void submit(Runnable runnable); void remove(Runnable runnable); void shutdown(); } // IOScheduler public class IOScheduler implements Scheduler { // 线程池 private final ExecutorService executor = Executors.newCachedThreadPool(); // 保存Future对象,为了能够中断指定线程 private final Map<Runnable, Future> futureMap = new ConcurrentHashMap<>(); @Override public void submit(Runnable runnable) { Future future = futureMap.get(runnable); // 如果对应的任务正在执行,则无需再提交 if (future != null && !future.isDone()) return; if (executor.isShutdown()) return; futureMap.put(runnable, executor.submit(runnable)); } @Override public void remove(Runnable runnable) { Future future = futureMap.get(runnable); if (future == null) return; try { future.cancel(true); } catch (Exception ignored) { } finally { futureMap.remove(runnable); } } @Override public void shutdown() { if (!executor.isShutdown()) executor.shutdown(); } } // ObservableObserveOn.java public final class ObservableObserveOn<T> extends Observable<T> { private final ObservableSource<T> source; private final Scheduler scheduler; ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new ObserveOnObserver<>(observer, scheduler)); } static final class ObserveOnObserver<T> implements Observer<T>, Disposable, Runnable { private final Observer<? super T> downstream; private final Scheduler scheduler; private Disposable upstream; private volatile boolean done; private volatile boolean disposed; private Queue<T> queue = new LinkedList<>(); private Throwable error; ObserveOnObserver(Observer<? super T> actual, Scheduler scheduler) { this.downstream = actual; this.scheduler = scheduler; } @Override public void onSubscribe(Disposable d) { upstream = d; downstream.onSubscribe(this); } @Override public void onNext(T t) { if (done) return; queue.offer(t); schedule(); } @Override public void onError(Throwable t) { if (done) return; done = true; error = t; schedule(); } @Override public void onComplete() { if (done) return; done = true; schedule(); } @Override public void dispose() { if (!disposed) { disposed = true; upstream.dispose(); scheduler.remove(this); queue.clear(); } } @Override public boolean isDisposed() { return disposed; } // 提交任务 void schedule() { scheduler.submit(this); } // 检查事件是否已中断,并作出相应的反馈 boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) { if (disposed) { queue.clear(); return true; } if (d) { Throwable e = error; if (e != null) { disposed = true; queue.clear(); a.onError(e); scheduler.remove(this); return true; } else if (empty) { disposed = true; a.onComplete(); scheduler.remove(this); return true; } } return false; } @Override // 拦截事件传递,到run方法,run方法将由线程池运行 public void run() { final Queue<T> q = queue; final Observer<? super T> a = downstream; for (; ; ) { if (checkTerminated(done, q.isEmpty(), a)) return; for (; ; ) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { disposed = true; upstream.dispose(); q.clear(); a.onError(ex); scheduler.remove(this); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) return; if (empty) break; a.onNext(v); } } } } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { ... // 线程调度操作符 public final Observable<T> observeOn(Scheduler scheduler) { return new ObservableObserveOn<>(this, scheduler); } }
接下来我们来看示例:
public void schedulerTest() throws InterruptedException { Observable.create((ObservableOnSubscribe<String>) emmit -> { System.out.println("emmit : " + Thread.currentThread().getName()); emmit.onNext("1"); emmit.onNext("2"); emmit.onComplete(); }) .observeOn(Schedulers.io()) .map(it -> { int r = Integer.parseInt(it) * 10; System.out.println(r + " : map : " + Thread.currentThread().getName()); return r; }).subscribe(it -> System.out.println(it + " : observer : " + Thread.currentThread().getName()), Functions.emptyConsumer(), () -> System.out.println("onCompleted.... " + Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(1); } emmit : main 10 : map : pool-1-thread-1 10 : observer : pool-1-thread-1 20 : map : pool-1-thread-1 20 : observer : pool-1-thread-1 onCompleted.... pool-1-thread-1
可以看到 observeOn
的所有下游事件,都在新的线程中运行了!!至此,线程调度的部分功能,我们也粗略的实现了!
总结
如果,你从开始看到现在, 我们已经自己实现了rxjava的一个基本使用操作了,编写了10来个类,其中大部分都是接口,写了500多行的代码.其中涉及rxjava中事件分发,lambda调用,取消和中断,map
操作符,io
线程切换,这一完整的流程.
rxjava 提供了丰富的 操作符, 和 各种的线程切换模型, 我们在理解其原理的情况下,都可以自己来实现.
rxjava中 RxJavaPlugins
使用代理的思想来插入全局资源管理,以及使用Backpressure
(背压)来控制数据流的思想,我们都可以学习和借鉴!
我们在学习过程中,可以根据源码,分解其中的知识点,逐步消化,甚至自己动手来实现它, 来达到深入理解的目的.
最后,赶紧动手编写吧!!!
引用
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java并发编程之线程创建和启动(Thread、Runnable、Callable和Future)
这一系列的文章暂不涉及Java多线程开发中的底层原理以及JMM、JVM部分的解析(将另文总结),主要关注实际编码中Java并发编程的核心知识点和应知应会部分。 说在前面,Java并发编程的实质,是线程对象调用start方法启动多线程,而线程对象则必须是Thread类或其子类实现。Runnable和Callable的作用类似于Comparable、Serializable,是用于被并发的类实现的接口,从而使得Thread类可以在初始化时传入这个被并发的类。此是大前提。本文从多线程实现和启动出发,对这些类或接口予以说明。 Thread 通过Thread的子类创建多线程的步骤如下: 1. 创建Thread的子类,并重写run()方法,该方法即为线程执行体。 2. 创建Thread子类的对象,即为线程对象。 3. 调用线程对象的start()方法启动线程。 1 public class TestThread extends Thread{ 2 3 public TestThread(String name) { 4 setName(name); 5 } 6 @Override 7 public...
- 下一篇
工作环境换成Ubuntu18.04小记
原文: 工作环境换成Ubuntu18.04小记 Linux汇总:https://www.cnblogs.com/dunitian/p/4822808.html#linux Ubuntu常用软件安装(小集合)http://www.cnblogs.com/dunitian/p/6670560.html Ubuntu 18.04 最小安装后: 更新之后: 系统自带的Python为3.6版本,已经没有Python2.7了 ifconfig也跟CentOS7一样了 ==> ip addr 有些危险操作被屏蔽了 更新系统 sudo apt update sudo apt upgrade sudo apt dist-upgrade 卸载应用 sudo apt autoremove gedit sudo apt autoremove firefox* 命令安装 开启远程连接 sudo apt install openssh-server -y 安装Git版本控制 sudo apt install git -y 安装浏览器 sudo add-apt-repository ppa:a-v-shkop...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Linux系统CentOS6、CentOS7手动修改IP地址
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装