首先看下带有线程操作的基本流程代码
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer () { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });复制代码
根据上一篇文章分析的结论,我们可以先从下往上分析,subscribe方法上一篇分析过了,首先看observeOn方法。
AndroidSchedulers.mainThread()创建HandlerScheduler实例,传入绑定主线程looper的handler,看下HandlerScheduler代码:final class HandlerScheduler extends Scheduler { private final Handler handler; HandlerScheduler(Handler handler) { this.handler = handler; } @Override public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } @Override public Worker createWorker() { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } //在这里切换线程执行subscribe @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } @Override public void dispose() { disposed = true; handler.removeCallbacksAndMessages(this /* token */); } @Override public boolean isDisposed() { return disposed; } } private static final class ScheduledRunnable implements Runnable, Disposable { private final Handler handler; private final Runnable delegate; private volatile boolean disposed; ScheduledRunnable(Handler handler, Runnable delegate) { this.handler = handler; this.delegate = delegate; } @Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } } @Override public void dispose() { disposed = true; handler.removeCallbacks(this); } @Override public boolean isDisposed() { return disposed; } }}复制代码
接着再看observeOn,它会返回一个ObservableObserveOn对象,它其实也是一个Observable。根据上篇文章分析,调用订阅方法最终会调用Observable的subscribeActual,在这里就是ObservableObserveOn重写的subscribeActual
public final class ObservableObserveOnextends AbstractObservableWithUpstream { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } //调用入口 @Override protected void subscribeActual(Observer observer) { // 我们传入的是HandlerScheduler,所以走到else里面 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); //可以看到subscribe还是在原线程调用,onNext,onComplete的调用是在切换的线程里面 source.subscribe(new ObserveOnObserver (observer, w, delayError, bufferSize)); } } static final class ObserveOnObserver extends BasicIntQueueDisposable implements Observer , Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; SimpleQueue queue; Disposable s; Throwable error; volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable qd = (QueueDisposable ) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } } queue = new SpscLinkedArrayQueue (bufferSize); actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { //放到队列中 queue.offer(t); } //切换线程 schedule(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true; schedule(); } @Override public void onComplete() { if (done) { return; } done = true; schedule(); } @Override public void dispose() { if (!cancelled) { cancelled = true; s.dispose(); worker.dispose(); if (getAndIncrement() == 0) { queue.clear(); } } } @Override public boolean isDisposed() { return cancelled; } void schedule() { if (getAndIncrement() == 0) { //这个worker是HandlerScheduler里面创建的HandlerWorker,通过绑定了主线程looper的handler执行操作 worker.schedule(this); } } //循环取出队列内的值调用onNext void drainNormal() { int missed = 1; final SimpleQueue q = queue; final Observer a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } void drainFused() { int missed = 1; for (;;) { if (cancelled) { return; } boolean d = done; Throwable ex = error; if (!delayError && d && ex != null) { actual.onError(error); worker.dispose(); return; } actual.onNext(null); if (d) { ex = error; if (ex != null) { actual.onError(ex); } else { actual.onComplete(); } worker.dispose(); return; } missed = addAndGet(-missed); if (missed == 0) { break; } } } //handler执行的run方法 @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } boolean checkTerminated(boolean d, boolean empty, Observer a) { if (cancelled) { queue.clear(); return true; } if (d) { Throwable e = error; if (delayError) { if (empty) { if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { if (e != null) { queue.clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { a.onComplete(); worker.dispose(); return true; } } } return false; } @Override public int requestFusion(int mode) { if ((mode & ASYNC) != 0) { outputFused = true; return ASYNC; } return NONE; } @Nullable @Override public T poll() throws Exception { return queue.poll(); } @Override public void clear() { queue.clear(); } @Override public boolean isEmpty() { return queue.isEmpty(); } }}复制代码
总结:
observeOn流程: AndroidSchedulers.mainThread()->创建HandlerScheduler,传参绑定了主线程looper的handler observeOn()->创建ObservableObserveOn实例,重写subscribeActual方法 subscribeActual->source.subscribe(ObserveOnObserver),source是上源传递下来的ObservableSource ObserveOnObserver是把Observer封装了一下,在调onNext时通过HandlerScheduler的HandlerWorker切换线程执行