您现在的位置是:首页 > 文章详情

Rxjava深入理解之自己动手编写Rxjava

日期:2018-11-05点击:384

Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看.

从观察者模式说起

观察者模式,是我们在平时使用的比较多的一种设计模式.
观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己. 它的特点是 一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。

本文的重点不是在介绍 观察者模式,因此这个话题就不展开讨论.

Rxjava 是一种 函数式编程,或者称为链式调用模型,也是使用观察者模式来实现事件的传递与监听.

下面我们来看,Rxjava 和 普通观察者的点区别.

  • 普通的观察者模式通常是 一个主题,多个观察者配合,基本上是属于 一对多的情况.
  • Rxjava的观察者模式通常是 一对一的关系.
  • 普通的观察者模式是 主题数据改变时,通知观察者数据的变动
  • Rxjava的观察者模式是 在被观察者(主题)调用subscribe方法后,触发数据流动和观察者接收事件.

基础知识介绍到这里,接下来我们来自己动手编写Rxjava

编写Rxjava

看一个常见的rxjava的使用示例,(原始数据,数据转换,线程切换,数据接收处理一系列功能):

 public static void main(String[] args) throws InterruptedException { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }) .observeOn(Schedulers.io()) .map(s -> Integer.parseInt(s) * 10) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(1); } // 10 // 20 

接下来,我会一步一步带领大家实现上述所有的功能.

一个简单的观察者模式

// Observer.java // 观察者 public interface Observer<T> { void onUpdate(T t); } // ObservableSource.java // 被观察者(主题)接口 public interface ObservableSource<T> { void subscribe(Observer<? super T> observer); } // Observable.java // 具体的被观察者(主题) public class Observable<T> implements ObservableSource<T> { private T t; public Observable(T t) { this.t = t; } @Override public void subscribe(Observer<? super T> observer) { // 调用订阅时,触发观察者更新 observer.onUpdate(t); } } 

使用:

 public static void main(String[] args) { // 观察者 Observer<String> observer = s -> System.out.println(s); // 被观察者(主题) Observable<String> observable = new Observable<>("hello"); // 调用 observable.subscribe(observer); } // hello 

这样,算是一个简单的观察模式了,但是这种方式很不灵活,数据在构造中直接传入了.

接下来我们来改造一下 Observable.java类. 可以传入一个接口来定义数据的传递规则,并且为Observable写一个适配器和一个事件分发器,为原始事件的产生提供支持.

  • 添加 Emitter接口,它是一个事件分发的接口;
  • 添加 ObservableOnSubscribe接口,它是创建 Observable实例的桥梁,并且有生产事件的功能,支持lambda方式调用;
  • 添加 ObservableCreate类,它是Observable的适配器,能够根据ObservableOnSubscribe接口,快速创建一个 Observable实例;并且内部类CreateEmitter实现了Emitter接口,用于事件的分发;
  • 修改 Observable类,添加工厂方法,能够根据ObservableOnSubscribe接口,快速构建Observable实例;
// Emitter.java // 事件分发器接口 public interface Emitter<T> { void onUpdate(T t); } // ObservableOnSubscribe.java // Observable的事件分发接口 public interface ObservableOnSubscribe<T> { void subscribe(Emitter<T> emitter) throws Exception; } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { // 工厂方法,生产出一个Observable实例 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return new ObservableCreate<>(source); } // 真正进行事件分发处理的方法 abstract void subscribeActual(Observer<? super T> observer) throws Exception; @Override // 交给subscribeActual实现,需要子类实现 public void subscribe(Observer<? super T> observer) throws Exception { subscribeActual(observer); } } // ObservableCreate.java // Observable的一个适配器,用于快速创建一个可以发送事件的Observable class ObservableCreate<T> extends Observable<T> { // 事件分发接口 private final ObservableOnSubscribe<T> source; ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override // 分发逻辑的具体代码 void subscribeActual(Observer<? super T> observer) throws Exception { CreateEmitter<T> emitter = new CreateEmitter<>(observer); source.subscribe(emitter); } // 内部分发器 static class CreateEmitter<T> implements Emitter<T> { private final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override // 这里只是简单的将observer观察者的事件直接分发出去 public void onUpdate(T t) { observer.onUpdate(t); } } } 

ObservableSource.javaObserver.java没有修改,固没有贴出,未贴出代码请查看上一步.使用 :

 public static void main(String[] args) throws Exception { Observable.create(emitter -> { emitter.onUpdate("hello"); emitter.onUpdate("world"); }) .subscribe(System.out::println); } // hello // world 

哇! 你会发现,到此为止,你已经使用观察模式实现了一个简易的函数式编程的代码了.
如果你完全理解了上述代码是怎么产生的,那么恭喜你,你已经理解rxjava最最核心的原理了.

添加事件结束和异常捕获

上诉的代码,还不能够捕获异常和结束事件,这样使用起来很不方便,接下来我们来改造实现它.
仿造rxjava,我们也将事件分为onNext,onError,onComplete三个事件.

需要修改 分发器接口和观察者接口,以及Observable的适配器.

  • 修改 Observer接口和Emitter接口, 改为onNext,onError,onComplete方法;
  • 修改 ObservableCreate类,添加异常处理和结束的逻辑;
// Observer.java public interface Observer<T> { void onNext(T t); void onError(Throwable e); void onComplete(); } // Emitter.java public interface Emitter<T> { void onNext(T t); void onError(Throwable e); void onComplete(); } // ObservableCreate.java class ObservableCreate<T> extends Observable<T> { // 事件分发接口 private final ObservableOnSubscribe<T> source; ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override // 分发逻辑代码 void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> emitter = new CreateEmitter<>(observer); try { source.subscribe(emitter); } catch (Exception e) { // 异常接收和处理 emitter.onError(e); } } // 内部分发器 static class CreateEmitter<T> implements Emitter<T> { private final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { observer.onNext(t); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } } } 

修改过这是三个类后,我们就能接收异常和结束了.使用:

 public static void main(String[] args) { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }) .subscribe(new Observer<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("end..."); } }); } // 1 // 2 // end... 

嗯!? 虽然实现了接收异常和结束的功能,但是有时我们只需要onNext事件时,这样的代码写起来不够优雅.

接下来我们编写一个观察者的适配器,让它能够使用 lambda表达式来优雅的编写代码.

  • 添加 Consumer接口,它是接收一个参数,无返回值的接口,用途是进行lambda方式进行参数传递;
  • 添加 Action接口,它是一个不接受参数,无返回值和的接口,用途也是进行lambda方式进行参数传递;
  • 添加 Functions类,它是一个辅助类,能获取空Consumer和空Action实现;
  • 添加 LambdaObserver类,它会将lambda参数形式,转化为Observer实例,进而实现lambda式的调用;
  • 修改 Observable类, 添加 void subscribe(Consumer<? super T> next, Action complete, Consumer<? super Throwable> error)系列的方法,让subscribe方法真正支持 lambda式调用.
// Consumer.java // 接受一个参数,无返回的接口 public interface Consumer<T> { void apply(T t) throws Exception; } // Action.java // 不接受参数,无返回的接口 public interface Action { void apply() throws Exception; } // Functions.java public class Functions { public static final Action EMPTY_ACTION = () -> {}; public static <T> Consumer<T> emptyConsumer() { return t -> {}; } } // LambdaObserver.java public class LambdaObserver<T> implements Observer<T> { private final Consumer<? super T> onNext; private final Consumer<? super Throwable> onError; private final Action onComplete; public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) { this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; } @Override public void onNext(T t) { try { onNext.apply(t); } catch (Exception e) { onError(e); } } @Override public void onError(Throwable e) { try { onError.apply(e); } catch (Exception e1) { e1.printStackTrace(); } } @Override public void onComplete() { try { onComplete.apply(); } catch (Exception e) { e.printStackTrace(); } } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { // 工厂方法,生产出一个Observable实例 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return new ObservableCreate<>(source); } // 真正进行事件分发处理的方法 abstract void subscribeActual(Observer<? super T> observer); @Override // 交给subscribeActual实现,需要子类实现 public void subscribe(Observer<? super T> observer) { subscribeActual(observer); } // 接受3个lambda表达式的方法参数 public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) { LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete); subscribe(lambdaObserver); } // 接受2个lambda表达式的方法参数 public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) { subscribe(next, error, Functions.EMPTY_ACTION); } // 接受1个lambda表达式的方法参数 public void subscribe(Consumer<? super T> next) { subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION); } } 

接下来是调用:

public static void main(String[] args) { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }).subscribe(System.out::println); } // 1 // 2 

啧啧!?有点激动了,已经看起来有模有样了,但是功能远远还不够.

添加 是否消费事件的功能(中断事件功能)

在上诉的代码中,我们无法主动中断整个事件发生的过程.接下来我们就需要编写 Disposable来实现onCompleteonError的自中断,以及主动取消事件.

Rxjava中,Disposable是使用 枚举类型加上原子引用(AtomicReference)类来实现线程安全(具体可查看DisposableHelper类).这种方式比较繁琐,这里就不用这种方式来演示,而使用 volatile声明的状态变量来做同步安全.

  • 添加 Disposable接口,提供中断和是否中断的方法;
  • 修改Observer接口, 添加onSubscribe方法,让观察者可以在事件传递前,获取Disposable,进而可以在事件传递的任意阶段中断事件;
  • 修改ObservableCreate类,添加观察者回调onSubscribe,此回调需在事件分发前才能起到作用;内部类CreateEmitter实现Disposable接口,在事件分发前先判断是否被中断了;使用volatile变量实现中断判断;
  • 修改LambdaObserver类,让它实现Disposable接口,添加是否中断的判断;
  • 修改Observable类,添加onSubscribelambda调用;以及返回Disposable实例;
// Disposable.java public interface Disposable { void dispose(); boolean isDisposed(); } // Observer.java public interface Observer<T> { void onSubscribe(Disposable d); void onNext(T t); void onError(Throwable e); void onComplete(); } // ObservableCreate.java // Observable的一个适配器,用于快速创建一个可以发送事件的Observable final class ObservableCreate<T> extends Observable<T> { // 事件分发接口 private final ObservableOnSubscribe<T> source; ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override // 分发逻辑代码 void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> emitter = new CreateEmitter<>(observer); // 将中断器回调给observer observer.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Exception e) { // 异常接收和处理 emitter.onError(e); } } // 内部分发器 static class CreateEmitter<T> implements Emitter<T>, Disposable { private final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { // 如果事件没被消费,则进行操作 if (!isDisposed()) observer.onNext(t); } @Override public void onError(Throwable e) { if (!isDisposed()) { try { observer.onError(e); } finally { // 触发消费,后续不再处理事件 dispose(); } } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { // 触发消费,后续不再处理事件 dispose(); } } } private volatile boolean isDisposed = false; @Override public void dispose() { isDisposed = true; } @Override public boolean isDisposed() { return isDisposed; } } } // LambdaObserver.java public class LambdaObserver<T> implements Observer<T>, Disposable { private final Consumer<? super T> onNext; private final Consumer<? super Throwable> onError; private final Action onComplete; private final Consumer<? super Disposable> onSubscribe; public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; this.onSubscribe = onSubscribe; } @Override public void onSubscribe(Disposable d) { try { onSubscribe.apply(d); } catch (Exception e) { onError(e); } } @Override public void onNext(T t) { if (!isDisposed()) try { onNext.apply(t); } catch (Exception e) { onError(e); } } @Override public void onError(Throwable e) { if (!isDisposed()) try { onError.apply(e); } catch (Exception e1) { e1.printStackTrace(); } finally { dispose(); } } @Override public void onComplete() { if (!isDisposed()) try { onComplete.apply(); } catch (Exception e) { e.printStackTrace(); } finally { dispose(); } } private volatile boolean isDisposed = false; @Override public void dispose() { isDisposed = true; } @Override public boolean isDisposed() { return isDisposed; } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { // 工厂方法,生产出一个Observable实例 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return new ObservableCreate<>(source); } // 真正进行事件分发处理的方法 abstract void subscribeActual(Observer<? super T> observer); @Override // 交给subscribeActual实现,需要子类实现 public void subscribe(Observer<? super T> observer) { subscribeActual(observer); } // 接受4个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete, Consumer<? super Disposable> onSubscribe) { LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete, onSubscribe); subscribe(lambdaObserver); return lambdaObserver; } // 接受3个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) { return subscribe(next, error, complete, Functions.emptyConsumer()); } // 接受2个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) { return subscribe(next, error, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } // 接受1个lambda表达式的方法参数 public Disposable subscribe(Consumer<? super T> next) { return subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.emptyConsumer()); } } 

到此,我们就能够控制事件的中断了.我们来看使用:

 private Disposable mDisposable = null; void disposableTest() { Observable.create((ObservableOnSubscribe<Integer>) emitter -> { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }) .subscribe(integer -> { System.out.println(integer); // 3 事件将不再传递和接收 if (integer == 2 && mDisposable != null) mDisposable.dispose(); }, Functions.emptyConsumer(), Functions.EMPTY_ACTION, d -> mDisposable = d); } // 1 // 2 // 这种方式只在,异步的情况下使用,由于示例中目前还不支持异步,因此以下代码起不了作用. void disposableTest2() { Disposable disposable = Observable .create((ObservableOnSubscribe<Integer>) emitter -> { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }) .subscribe(System.out::println); disposable.dispose(); } 

像这样,在事件分发前,拿到Disposable对象,这样你能在任意阶段中断 这个过程.

至此, 我们实现了基本的事件发送和lambda调用,以及中断功能.接下来我们需要开始添加 操作符了, 让它真的能达到函数式调用的模样!

添加 操作符(Map)

操作符的重点是, 你需要处理好上游传递下来的Disposable对象,以及下游待传递的Observer.

下面我们来实现map操作符的功能, 它可以将一个类型转化为另一个类型.

  • 添加 Function接口,该接口接收一个类型的参数,并且返回另一个类型的值;
  • 添加 BasicObserver类,该类实现ObserverDisposable接口,用于传递上下游的数据;
  • 添加 ObservableMap类,该类继承Observable,并且使用Function接口,实现类型转换;内部类MapObserver 继承自BasicObserver实现具体转换逻辑;
  • 修改 Observable类,添加map操作符;
// Function.java // 接收一个类型的参数,返回一个类型 public interface Function<T, R> { R apply(T t) throws Exception; } // BasicObserver.java public abstract class BasicObserver<T, R> implements Observer<T>, Disposable { // 上游传递的Disposable对象 private Disposable upstream; // 下游接收的观察者对象 final Observer<? super R> downstream; // 如果已经中断,则无需下传 boolean done; BasicObserver(Observer<? super R> downstream) { this.downstream = downstream; } @Override public void onSubscribe(Disposable d) { // 接收上游的Disposable this.upstream = d; downstream.onSubscribe(this); } @Override public void onError(Throwable e) { if (done) return; done = true; downstream.onError(e); } @Override public void onComplete() { if (done) return; done = true; downstream.onComplete(); } @Override public void dispose() { upstream.dispose(); } @Override public boolean isDisposed() { return upstream.isDisposed(); } } // ObservableMap.java final class ObservableMap<T, U> extends Observable<U> { private final ObservableSource<T> source; private final Function<? super T, ? extends U> function; ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { this.source = source; this.function = function; } @Override void subscribeActual(Observer<? super U> observer) { source.subscribe(new MapObserver<T, U>(observer, function)); } static class MapObserver<T, U> extends BasicObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) return; try { // 这里实现具体的转化,下游接收到转化后的类型变量 downstream.onNext(mapper.apply(t)); } catch (Exception e) { onError(e); } } } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { ... // map操作符 public <R> Observable<R> map(Function<? super T, ? extends R> mapper) { return new ObservableMap<>(this, mapper); } } 

实现好了map功能,我们来验证一下:

 void mapTest() { Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("1"); emitter.onNext("2"); emitter.onComplete(); }) .map(s -> Integer.parseInt(s) * 10) .subscribe(System.out::println); } // 10 // 20 

哇!! 有点不可思议,已经做到这一步了,我们还差什么呢? 没错,就是线程切换!!

PS : Rxjava中有很多的操作符, 我用其中比较典型的map来做示范,其他操作符,有兴趣的可以自己手动来实现.

线程切换

rxjava中,自己实现了一套功能强大的线程池.配合操作符 observeOn,subscribeOn来进行线程切换.这里就不对其进行展开,我们的重点是自己实现.

由于rxjava的线程池调度相当的复杂, 这里为了方便演示,将只采用jdk自带的线程池来做线程调度.下面我们来实现, rxjava中 observeOn操作符,以及Schedulers.io()的线程调度.

  • 添加Scheduler接口,定义了调度的方法,提交任务,移除任务,停止线程池;
  • 添加IOScheduler类,是Scheduler的具体实现,采用newCachedThreadPool提交任务和中断任务;
  • 添加ObservableObserveOn类,继承Observable,为observeOn操作符提供支持;内部类ObserveOnObserver,实现Observer,DisposableRunnable接口,run方法将拦截所有事件,将其作为任务提交给线程池运行,达到异步的效果;
  • 修改Observable类,添加observeOn操作符;
// Scheduler.java public interface Scheduler { void submit(Runnable runnable); void remove(Runnable runnable); void shutdown(); } // IOScheduler public class IOScheduler implements Scheduler { // 线程池 private final ExecutorService executor = Executors.newCachedThreadPool(); // 保存Future对象,为了能够中断指定线程 private final Map<Runnable, Future> futureMap = new ConcurrentHashMap<>(); @Override public void submit(Runnable runnable) { Future future = futureMap.get(runnable); // 如果对应的任务正在执行,则无需再提交 if (future != null && !future.isDone()) return; if (executor.isShutdown()) return; futureMap.put(runnable, executor.submit(runnable)); } @Override public void remove(Runnable runnable) { Future future = futureMap.get(runnable); if (future == null) return; try { future.cancel(true); } catch (Exception ignored) { } finally { futureMap.remove(runnable); } } @Override public void shutdown() { if (!executor.isShutdown()) executor.shutdown(); } } // ObservableObserveOn.java public final class ObservableObserveOn<T> extends Observable<T> { private final ObservableSource<T> source; private final Scheduler scheduler; ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new ObserveOnObserver<>(observer, scheduler)); } static final class ObserveOnObserver<T> implements Observer<T>, Disposable, Runnable { private final Observer<? super T> downstream; private final Scheduler scheduler; private Disposable upstream; private volatile boolean done; private volatile boolean disposed; private Queue<T> queue = new LinkedList<>(); private Throwable error; ObserveOnObserver(Observer<? super T> actual, Scheduler scheduler) { this.downstream = actual; this.scheduler = scheduler; } @Override public void onSubscribe(Disposable d) { upstream = d; downstream.onSubscribe(this); } @Override public void onNext(T t) { if (done) return; queue.offer(t); schedule(); } @Override public void onError(Throwable t) { if (done) return; done = true; error = t; schedule(); } @Override public void onComplete() { if (done) return; done = true; schedule(); } @Override public void dispose() { if (!disposed) { disposed = true; upstream.dispose(); scheduler.remove(this); queue.clear(); } } @Override public boolean isDisposed() { return disposed; } // 提交任务 void schedule() { scheduler.submit(this); } // 检查事件是否已中断,并作出相应的反馈 boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) { if (disposed) { queue.clear(); return true; } if (d) { Throwable e = error; if (e != null) { disposed = true; queue.clear(); a.onError(e); scheduler.remove(this); return true; } else if (empty) { disposed = true; a.onComplete(); scheduler.remove(this); return true; } } return false; } @Override // 拦截事件传递,到run方法,run方法将由线程池运行 public void run() { final Queue<T> q = queue; final Observer<? super T> a = downstream; for (; ; ) { if (checkTerminated(done, q.isEmpty(), a)) return; for (; ; ) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { disposed = true; upstream.dispose(); q.clear(); a.onError(ex); scheduler.remove(this); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) return; if (empty) break; a.onNext(v); } } } } } // Observable.java public abstract class Observable<T> implements ObservableSource<T> { ... // 线程调度操作符 public final Observable<T> observeOn(Scheduler scheduler) { return new ObservableObserveOn<>(this, scheduler); } } 

接下来我们来看示例:

 public void schedulerTest() throws InterruptedException { Observable.create((ObservableOnSubscribe<String>) emmit -> { System.out.println("emmit : " + Thread.currentThread().getName()); emmit.onNext("1"); emmit.onNext("2"); emmit.onComplete(); }) .observeOn(Schedulers.io()) .map(it -> { int r = Integer.parseInt(it) * 10; System.out.println(r + " : map : " + Thread.currentThread().getName()); return r; }).subscribe(it -> System.out.println(it + " : observer : " + Thread.currentThread().getName()), Functions.emptyConsumer(), () -> System.out.println("onCompleted.... " + Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(1); } emmit : main 10 : map : pool-1-thread-1 10 : observer : pool-1-thread-1 20 : map : pool-1-thread-1 20 : observer : pool-1-thread-1 onCompleted.... pool-1-thread-1 

可以看到 observeOn的所有下游事件,都在新的线程中运行了!!至此,线程调度的部分功能,我们也粗略的实现了!

总结

如果,你从开始看到现在, 我们已经自己实现了rxjava的一个基本使用操作了,编写了10来个类,其中大部分都是接口,写了500多行的代码.其中涉及rxjava中事件分发,lambda调用,取消和中断,map操作符,io线程切换,这一完整的流程.

rxjava 提供了丰富的 操作符, 和 各种的线程切换模型, 我们在理解其原理的情况下,都可以自己来实现.

rxjava中 RxJavaPlugins使用代理的思想来插入全局资源管理,以及使用Backpressure(背压)来控制数据流的思想,我们都可以学习和借鉴!

我们在学习过程中,可以根据源码,分解其中的知识点,逐步消化,甚至自己动手来实现它, 来达到深入理解的目的.

最后,赶紧动手编写吧!!!

引用

  1. RxJava: Reactive Extensions for the JVM
原文链接:https://yq.aliyun.com/articles/666841
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章