作者:Oz
v 信号: Mojitok8275
版权声明:本文图文为博主原创,转载请注明出处。
文章似乎有些标题党的嫌疑,但是我相信根据我的理解画出两幅图可以让大家理解 RxJava2 的核心原理,稍后不要吝啬,请叫我灵魂画手😄!相信 RxJava
是大家业务中用到比较多的一个依赖库,RxJava 的强大之处在于它改变了程序员的编程习惯,相比较其他的开源项目,Rxjava 是最弯弯绕的一个。对于 RxJava 种类繁多的操作符,大多数同学都表示很是头疼,也有不少同学陷入了学习操作符不能停的怪圈。操作符要不要学,当然要,但是如果能理解 RxJava 的核心,操作符的使用就像是学会九阳神功的张无忌学招数,必定是手到擒来。所谓器欲尽其用,必先得其法。
这篇文章我会讲些什么
RxJava2 基本的运行流程
RxJava2 线程切换的原理(涉及到为什么 subscribeOn() 只有第一次调用时有效)
为什么一订阅就回调了 onSubscribe
为什么 subscribeOn() 对上面的代码生效,observerOn() 对下面代码生效
以下内容如果涉及到自己写的代码我会采用 Kotlin 进行示例展示,涉及到 RxJava2 会展示部分源码。
简单的链式调用(无线程切换)
先来看一段示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe (emitter: ObservableEmitter <String >) { Log.d("solart" , "subscribe > ${Thread.currentThread().name} " ) emitter.onNext("test" ) emitter.onComplete() } }).flatMap(object : Function<String, Observable<String>> { override fun apply (t: String ) : Observable<String> { return Observable.just(t) } }).map(object : Function<String, Int > { override fun apply (t: String ) : Int { return 0 } }).subscribe(object : Observer<Int > { override fun onSubscribe (d: Disposable ) { Log.d("solart" , "onSubscribe > ${Thread.currentThread().name} " ) } override fun onNext (t: Int ) { Log.d("solart" , "onNext > ${Thread.currentThread().name} " ) } override fun onComplete () { Log.d("solart" , "onComplete > ${Thread.currentThread().name} " ) } override fun onError (e: Throwable ) { Log.d("solart" , "onError > ${Thread.currentThread().name} " ) } })
这段代码中我们简单用了 create
、flatMap
、map
等操作符,进行了流式的数据转换,最后我们通过 subscribe
订阅了数据流,其实通过查看源码我们不难发现, RxJava 本身是个逆向订阅的过程,话不多说先看图
点击查看大图
数据源的包裹 比照着这张图,我们来看一下,首先蓝色虚线
部分是我们代码中实际调用的顺序,查看 Observable.create
我们不难发现,此处就是产生了一个 ObservableCreate
实例,
1 2 3 4 public static <T> Observable<T> create (ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null" ); return RxJavaPlugins.onAssembly(new ObservableCreate <T>(source)); }
如我们图中所示, ObservableCreate
内部包含一个类型为 ObservableOnSubscribe<T>
的 source
变量,根据我们代码中的调用,这个 source
就是我们 Kotlin 代码中的匿名对象 object : ObservableOnSubscribe<String>
。
1 2 3 4 5 6 7 8 9 10 11 12 13 public final class ObservableCreate <T> extends Observable <T> { final ObservableOnSubscribe<T> source; public ObservableCreate (ObservableOnSubscribe<T> source) { this .source = source; } @Override protected void subscribeActual (Observer<? super T> observer) { ... } ... }
我们顺着代码的调用顺序,继续看一下 flatMap
的方法中又做了什么:
1 2 3 4 5 public final <R> Observable<R> flatMap (Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... return RxJavaPlugins.onAssembly(new ObservableFlatMap <T, R>(this , mapper, delayErrors, maxConcurrency, bufferSize)); }
类似的产生了一个 ObservableFlatMap
实例,而其内部持有一个类型为 ObservableSource<T>
的 source
变量,而该 source 则是上一步中的 ObservableCreate
实例,依次我们看 map
依然是类似的代码,这里不在赘述,所以到此我们得到了图中蓝色虚线部分的内容,这个过程可以看作是一个将数据源层层打包的过程。
逆向订阅数据源 我们知道以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe
时(图中上半部分红色实线部分)才真正触发了 RxJava 的数据流,我们来看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final void subscribe (Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null" ); try { observer = RxJavaPlugins.onSubscribe(this , observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer" ); subscribeActual(observer); } catch (NullPointerException e) { throw e; } catch (Throwable e) { ... throw npe; } }
根据我们上面的分析,执行 subscribeActual
的对象其实是 ObservableMap
,我们来看它的 subscribeActual
的实现
1 2 3 4 5 6 7 8 9 public final class ObservableMap <T, U> extends AbstractObservableWithUpstream <T, U> { ... @Override public void subscribeActual (Observer<? super U> t) { source.subscribe(new MapObserver <T, U>(t, function)); } ... }
注意,此时产生了一个 MapObserver
对象,MapObserver
中通过 actual
持有了我们自己的匿名对象 object : Observer<Int>
,同样的,ObservableMap
执行 subscribeActual 又调用了上层的 source.subscribe
,依次逆向调用,就得到了我们图中上半部分的红线内容,这个过程我们可以称之为数据源的逆向订阅,这个过程同样也是一个层层打包的过程,只不过它打包的对象换成了观察者 Observer。
触发数据源产生原始数据,数据流转 当订阅发生在最顶层时,也就是 ObservableCreate
中的 subscribeActual
,此时触发了数据源的产生,通过 emitter
发射数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final class ObservableCreate <T> extends Observable <T> { ... @Override protected void subscribeActual (Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter <T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ... }
而我们代码中此时产生了真正的数据
1 2 3 4 5 override fun subscribe (emitter: ObservableEmitter <String >) { Log.d("solart" , "subscribe > ${Thread.currentThread().name} " ) emitter.onNext("test" ) emitter.onComplete() }
此时我们再来看 CreateEmitter
的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static final class CreateEmitter <T> extends AtomicReference <Disposable> implements ObservableEmitter <T>, Disposable { final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this .observer = observer; } @Override public void onNext (T t) { ... if (!isDisposed()) { observer.onNext(t); } } ... }
根据我们上面的分析 CreateEmitter
中持有的 observer
即是 FlatMapObserver
的实例,而 FlatMapObserver
调用 onNext 时,又会调用 MapObserver
的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。
观察我们这个图,你会发现,操作符
对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名 Observable + 操作符
,例如 ObservableMap
= Observable + map,观察者命名大多遵循 操作符 + Observer
,例如 FlatMapObserver
= flatMap + Observer。除了命名规则外,我们观察整个流程,你也会发现有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分),另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)。
异步事件流编程(线程切换) 相信有了上面的分析,大家对 RxJava 的逆向订阅以及数据流转有了一定的认识,但是 RxJava 的强大之处在于它的异步事件流编程方式,随心所欲的切换工作线程,下面我们来分析它是如何做到的。
同样的我们还是先给出一个简单的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe (emitter: ObservableEmitter <String >) { Log.d("solart" , "subscribe > ${Thread.currentThread().name} " ) emitter.onNext("test" ) emitter.onComplete() } }).subscribeOn(Schedulers.io()) .map(object : Function<String, Int > { override fun apply (t: String ) : Int { return 0 } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer<Int > { override fun onSubscribe (d: Disposable ) { Log.d("solart" , "onSubscribe > ${Thread.currentThread().name} " ) } override fun onNext (t: Int ) { Log.d("solart" , "onNext > ${Thread.currentThread().name} " ) } override fun onComplete () { Log.d("solart" , "onComplete > ${Thread.currentThread().name} " ) } override fun onError (e: Throwable ) { Log.d("solart" , "onError > ${Thread.currentThread().name} " ) } })
这里简化了操作符的调用,以切换线程为示例,根据这段代码,我画出了这个过程的流程图(灵魂画手有没有?)如下:
点击查看大图
图中不同颜色(红、绿、紫)的实线表示流程所属不同线程,体现在不同线程中的过程,且标上了对应的序号,方便大家观看,这个图已经能够揭示 RxJava 运转的核心原理。
逆向订阅时触发 subscribeOn 的线程切换 根据我们第一部分的分析,我们知道 RxJava 有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹,另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹,虽然我们在代码调用过程中使用了线程切换(subscribeOn 和 observerOn)这两个特殊的操作符,在整个流程中依然遵循了这两个包裹封装的过程,只不过它的特殊之处在于处理时完成了流程上的线程切换。
我们来看订阅时(图中⑦⑧的流程)切换线程的 ObservableSubscribeOn
的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public final class ObservableSubscribeOn <T> extends AbstractObservableWithUpstream <T, T> { final Scheduler scheduler; public ObservableSubscribeOn (ObservableSource<T> source, Scheduler scheduler) { super (source); this .scheduler = scheduler; } @Override public void subscribeActual (final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver <T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask (parent))); } ... }
在逆向订阅的流程中,通过指定 Scheduler
将 SubscribeTask
任务交给线程池处理,我们先来看一下 SubscribeTask
的代码,就是执行了订阅:
1 2 3 4 5 6 7 8 9 10 11 12 final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this .parent = parent; } @Override public void run () { source.subscribe(parent); } }
我们再来看 scheduler.scheduleDirect()
中是如何做到线程切换的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public abstract class Scheduler { ... @NonNull public abstract Worker createWorker () ; ... @NonNull public Disposable scheduleDirect (@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask (decoratedRun, w); w.schedule(task, delay, unit); return task; } ... }
我们示例中是切换到了 io
线程,所以我们对应的看一下 IoScheduler
的部分代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public final class IoScheduler extends Scheduler { ... @NonNull @Override public Worker createWorker () { return new EventLoopWorker (pool.get()); } ... static final class EventLoopWorker extends Scheduler .Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean (); EventLoopWorker(CachedWorkerPool pool) { this .pool = pool; this .tasks = new CompositeDisposable (); this .threadWorker = pool.get(); } ... @NonNull @Override public Disposable schedule (@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } static final class ThreadWorker extends NewThreadWorker { ... @NonNull public ScheduledRunnable scheduleActual (final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable (decoratedRun, parent); ... Future<?> f; try { if (delayTime <= 0 ) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { ... } return sr; } } }
综合上面的代码,我们来总结一下,其实 ObservableSubscribeOn
本身就是在 subscribeActual
中将上层数据源在异步线程中执行订阅,这样就完成了线程的切换,后续的流程都会在这个切换后的线程中执行,直到再次切换线程。因为 RxJava 本身是逆向订阅的流程,所以这里就解释了两个问题:1、为什么 subscribeOn() 对上面的代码生效?2、为什么 subscribeOn() 只有第一次调用时有效?归根结底都是因为逆向订阅的流程决定了 subscribeOn 是在订阅流程中起作用,此时数据还未产生,而在代码上第一个 subscribeOn 其实是逆向订阅流程的最后一个线程切换的地方,这个将会对生产原始数据所在线程产生直接影响 。这里还有一点要提一下,ObservableSubscribeOn
在执行 subscribeActual
时,回调了下层产生的 Observer
的 onSubscribe
,如图中的④⑤⑥流程,所以这也是为什么,在观察者一订阅后就会在当前订阅的线程收到 onSubscribe
的回调的原因。
正向数据流触发 observerOn 的线程切换 同第一部分一样的,订阅到最上层时,触发数据源产生原始数据,从而又正向的流转数据,此过程我们不在详细分析,参照1.3,我们着重看一下 ObserveOnObserver
的 onNext 处理的逻辑,也就是图中步骤⑮⑯:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public final class ObservableObserveOn <T> extends AbstractObservableWithUpstream <T, T> { final Scheduler scheduler; ... @Override protected void subscribeActual (Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver <T>(observer, w, delayError, bufferSize)); } } ... static final class ObserveOnObserver <T> extends BasicIntQueueDisposable <T> implements Observer <T>, Runnable { ... final Observer<? super T> actual; final Scheduler.Worker worker; ... @Override public void onNext (T t) { if (done) { return ; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } ... void schedule () { if (getAndIncrement() == 0 ) { worker.schedule(this ); } } ... @Override public void run () { if (outputFused) { drainFused(); } else { drainNormal(); } } ... } }
示例中我们此时切换到了 Main
线程中执行,我们来看对应的 HandlerScheduler
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 final class HandlerScheduler extends Scheduler { private final Handler handler; private final boolean async; ... @Override public Disposable scheduleDirect (Runnable run, long delay, TimeUnit unit) { if (run == null ) throw new NullPointerException ("run == null" ); if (unit == null ) throw new NullPointerException ("unit == null" ); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable (handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } @Override public Worker createWorker () { return new HandlerWorker (handler, async); } private static final class HandlerWorker extends Worker { ... @Override @SuppressLint("NewApi") public Disposable schedule (Runnable run, long delay, TimeUnit unit) { ... run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable (handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this ; if (async) { message.setAsynchronous(true ); } handler.sendMessageDelayed(message, unit.toMillis(delay)); ... return scheduled; } }
从代码中我们可以看到,此时将 Runnable
通过 Handler 发到了住线程去执行,所以经过此步骤后,后续的 onNext 的处理已经切换为主线程。同样的,这里也解释了一开始我们提到的另一个问题:为什么 observerOn() 对下面代码生效?正是因为,数据的流向决定了 observerOn() 对后续的 onNext 产生影响。
总结 至此 RxJava 运转机制我们已经分析完毕,大家可以比照图中流程,跟踪代码调用关系,相信会有很大收获。 RxJava 本身是一个变种的观察者模式,正是因为框架本身要实现异步事件流编程
,所以产生了逆向订阅的过程,同时数据又是正向流转的,这个过程中大家还需要理解两个包裹封装(被观察者、观察者)的过程,不管操作符怎么变换,都不会脱离这样的运作核心。
另外根据不同的操作符的实现,我们依照同样的模式,可实现自己的自定义操作符,只要能在订阅时和数据回流时做好上下层的衔接就好,这个大家可以自己实践。