博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rxjava2源码分析之线程切换(subscribeOn、observeOn)
阅读量:7030 次
发布时间:2019-06-28

本文共 9162 字,大约阅读时间需要 30 分钟。

首先看下带有线程操作的基本流程代码

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
emitter) throws Exception { emitter.onNext(1); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer
() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });复制代码

根据上一篇文章分析的结论,我们可以先从下往上分析,subscribe方法上一篇分析过了,首先看observeOn方法。

AndroidSchedulers.mainThread()创建HandlerScheduler实例,传入绑定主线程looper的handler,看下HandlerScheduler代码:

final class HandlerScheduler extends Scheduler {    private final Handler handler;    HandlerScheduler(Handler handler) {        this.handler = handler;    }    @Override    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {        if (run == null) throw new NullPointerException("run == null");        if (unit == null) throw new NullPointerException("unit == null");        run = RxJavaPlugins.onSchedule(run);        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);        handler.postDelayed(scheduled, unit.toMillis(delay));        return scheduled;    }    @Override    public Worker createWorker() {        return new HandlerWorker(handler);    }    private static final class HandlerWorker extends Worker {        private final Handler handler;        private volatile boolean disposed;        HandlerWorker(Handler handler) {            this.handler = handler;        }       //在这里切换线程执行subscribe        @Override        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {            if (run == null) throw new NullPointerException("run == null");            if (unit == null) throw new NullPointerException("unit == null");            if (disposed) {                return Disposables.disposed();            }            run = RxJavaPlugins.onSchedule(run);            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);            Message message = Message.obtain(handler, scheduled);            message.obj = this; // Used as token for batch disposal of this worker's runnables.            handler.sendMessageDelayed(message, unit.toMillis(delay));            // Re-check disposed state for removing in case we were racing a call to dispose().            if (disposed) {                handler.removeCallbacks(scheduled);                return Disposables.disposed();            }            return scheduled;        }        @Override        public void dispose() {            disposed = true;            handler.removeCallbacksAndMessages(this /* token */);        }        @Override        public boolean isDisposed() {            return disposed;        }    }    private static final class ScheduledRunnable implements Runnable, Disposable {        private final Handler handler;        private final Runnable delegate;        private volatile boolean disposed;        ScheduledRunnable(Handler handler, Runnable delegate) {            this.handler = handler;            this.delegate = delegate;        }        @Override        public void run() {            try {                delegate.run();            } catch (Throwable t) {                RxJavaPlugins.onError(t);            }        }        @Override        public void dispose() {            disposed = true;            handler.removeCallbacks(this);        }        @Override        public boolean isDisposed() {            return disposed;        }    }}复制代码

接着再看observeOn,它会返回一个ObservableObserveOn对象,它其实也是一个Observable。根据上篇文章分析,调用订阅方法最终会调用Observable的subscribeActual,在这里就是ObservableObserveOn重写的subscribeActual

public final class ObservableObserveOn
extends AbstractObservableWithUpstream
{ final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource
source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } //调用入口 @Override protected void subscribeActual(Observer
observer) { // 我们传入的是HandlerScheduler,所以走到else里面 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); //可以看到subscribe还是在原线程调用,onNext,onComplete的调用是在切换的线程里面 source.subscribe(new ObserveOnObserver
(observer, w, delayError, bufferSize)); } } static final class ObserveOnObserver
extends BasicIntQueueDisposable
implements Observer
, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer
actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; SimpleQueue
queue; Disposable s; Throwable error; volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer
actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable
qd = (QueueDisposable
) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } } queue = new SpscLinkedArrayQueue
(bufferSize); actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { //放到队列中 queue.offer(t); } //切换线程 schedule(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true; schedule(); } @Override public void onComplete() { if (done) { return; } done = true; schedule(); } @Override public void dispose() { if (!cancelled) { cancelled = true; s.dispose(); worker.dispose(); if (getAndIncrement() == 0) { queue.clear(); } } } @Override public boolean isDisposed() { return cancelled; } void schedule() { if (getAndIncrement() == 0) { //这个worker是HandlerScheduler里面创建的HandlerWorker,通过绑定了主线程looper的handler执行操作 worker.schedule(this); } } //循环取出队列内的值调用onNext void drainNormal() { int missed = 1; final SimpleQueue
q = queue; final Observer
a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } void drainFused() { int missed = 1; for (;;) { if (cancelled) { return; } boolean d = done; Throwable ex = error; if (!delayError && d && ex != null) { actual.onError(error); worker.dispose(); return; } actual.onNext(null); if (d) { ex = error; if (ex != null) { actual.onError(ex); } else { actual.onComplete(); } worker.dispose(); return; } missed = addAndGet(-missed); if (missed == 0) { break; } } } //handler执行的run方法 @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } boolean checkTerminated(boolean d, boolean empty, Observer
a) { if (cancelled) { queue.clear(); return true; } if (d) { Throwable e = error; if (delayError) { if (empty) { if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { if (e != null) { queue.clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { a.onComplete(); worker.dispose(); return true; } } } return false; } @Override public int requestFusion(int mode) { if ((mode & ASYNC) != 0) { outputFused = true; return ASYNC; } return NONE; } @Nullable @Override public T poll() throws Exception { return queue.poll(); } @Override public void clear() { queue.clear(); } @Override public boolean isEmpty() { return queue.isEmpty(); } }}复制代码

总结:

observeOn流程:
AndroidSchedulers.mainThread()->创建HandlerScheduler,传参绑定了主线程looper的handler observeOn()->创建ObservableObserveOn实例,重写subscribeActual方法
subscribeActual->source.subscribe(ObserveOnObserver),source是上源传递下来的ObservableSource
ObserveOnObserver是把Observer封装了一下,在调onNext时通过HandlerScheduler的HandlerWorker切换线程执行

转载于:https://juejin.im/post/5c47dc48e51d457d1b7f503c

你可能感兴趣的文章
为什么需要版本管理
查看>>
五、Dart 关键字
查看>>
React Native学习笔记(一)附视频教学
查看>>
记Promise得一些API
查看>>
javascript事件之调整大小(resize)事件
查看>>
20145234黄斐《Java程序设计》第六周学习总结
查看>>
【CLRS】《算法导论》读书笔记(四):栈(Stack)、队列(Queue)和链表(Linked List)...
查看>>
hibernate 和 mybatis区别
查看>>
互联网广告综述之点击率特征工程
查看>>
HDU3421 Max Sum II【序列处理】
查看>>
POJ NOI MATH-7653 地球人口承载力估计
查看>>
iOS UI高级之网络编程(HTTP协议)
查看>>
使用cocoaPods import导入时没有提示的解决办法
查看>>
iOS数据持久化存储之归档NSKeyedArchiver
查看>>
JavaScript面向对象
查看>>
Intellij修改模板代码
查看>>
2.页面布局示例笔记
查看>>
一些老游戏CPU 100%占用的解决方法
查看>>
f5基本介绍
查看>>
博前语
查看>>