这篇文章讲解 Rxjava2 中最难理解的线程调度是如何实现的。
我们来看这样一个例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Single.just("item") .map(object : Function<String, String> { override fun apply(t: String): String { println("1:${Thread.currentThread()}") return t } }) .observeOn(Schedulers.io()) .subscribe(object : SingleObserver<String> { override fun onSuccess(t: String) { println("2:${Thread.currentThread()}") } override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } })
输出结果是:
1 2 1:Thread[main,5,main] 2:Thread[RxCachedThreadScheduler-1,5,main]
可以看到,位于 observeOn 上游的生产者( map )没有切换线程,而下游的消费者( SingleObserver )切换了线程。
再来看一个案例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Single.just("item") .map(object : Function<String, String> { override fun apply(t: String): String { println("1:${Thread.currentThread()}") return t } }) .subscribeOn(Schedulers.io()) .subscribe(object : SingleObserver<String> { override fun onSuccess(t: String) { println("2:${Thread.currentThread()}") } override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } })
我们只是把 observeOn 改成了 subscribeOn 方法,结果如下:
1 2 1:Thread[RxCachedThreadScheduler-1,5,main] 2:Thread[RxCachedThreadScheduler-1,5,main]
结论是 subscribeOn 上游下游的生产者和消费者都切换了线程。
下面我们从源码的角度分析产生上面不同情况的原因。
observeOn 1 2 3 4 public final Single<T> observeOn(final Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler)); }
代码中的 this 指的是上游的 SingleMap ,scheduler 就是传入 observeOn 的参数,这里就是 Schedulers.io() 。
现在只需要把 Schedulers.io() 理解成是一个切换线程的工具,之后再详细分析它。
RxJavaPlugins.onAssembly 是钩子方法,可以直接把它的参数理解为返回值。接下来,看下 SingleObserveOn 类的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public final class SingleObserveOn<T> extends Single<T> { final SingleSource<T> source; final Scheduler scheduler; public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler)); } static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { private static final long serialVersionUID = 3528003840217436037L; final SingleObserver<? super T> downstream; final Scheduler scheduler; T value; Throwable error; ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.downstream = actual; this.scheduler = scheduler; } @Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { downstream.onSubscribe(this); } } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void onError(Throwable e) { this.error = e; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void run() { Throwable ex = error; if (ex != null) { downstream.onError(ex); } else { downstream.onSuccess(value); } } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } }
类中的 source 即为上游的 SingleMap 对象,scheduler 就是 Schedulers.io()。由第一个案例可知,调用了 observeOn 后在返回结果的基础上调用了 subscribe 方法,而这个 subscribe 方法在父类 Single 中的实现中调用了子类的 subscribeActual 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public abstract class Single<T> implements SingleSource<T> { ... @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(SingleObserver<? super T> observer) { ObjectHelper.requireNonNull(observer, "subscriber is null"); observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); try { subscribeActual(observer); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); NullPointerException npe = new NullPointerException("subscribeActual failed"); npe.initCause(ex); throw npe; } } ... }
当前的子类就是 SingleObserveOn 类,该类的 subscribeActual 方法中调用了上游 SingleMap 对象的 subscribe 方法,并把 ObserveOnSingleObserver 传给了上游。最终,依次调用了 SingleMap 中 MapSingleObserver 相关方法以及 SingleObserveOn 中 ObserveOnSingleObserver 相关方法。这里以 ObserveOnSingleObserver 中的 onSuccess 方法调用为例,代码实现如下。
1 2 3 4 5 6 @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); }
这里首先将 value 赋值给当前实例,之后调用了 scheduler.scheduleDirect 方法切换线程,在切换线程后执行了 ObserveOnSingleObserver 的 run 方法。
1 2 3 4 5 6 7 8 9 @Override public void run() { Throwable ex = error; if (ex != null) { downstream.onError(ex); } else { downstream.onSuccess(value); } }
downstream 就是下游的消费者。
所以这里可以得出结论,位于上游的逻辑( SingleMap 中 MapSingleObserver 相关方法)在原线程中执行,位于下游的消费者( SingleObserver )逻辑执行在切换后的线程中。
subscribeOn 和 observeOn 类似,所以直接分析对应的 SingleSubscribeOn 类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public final class SingleSubscribeOn<T> extends Single<T> { final SingleSource<? extends T> source; final Scheduler scheduler; public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source); observer.onSubscribe(parent); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { private static final long serialVersionUID = 7000911171163930287L; final SingleObserver<? super T> downstream; final SequentialDisposable task; final SingleSource<? extends T> source; SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) { this.downstream = actual; this.source = source; this.task = new SequentialDisposable(); } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); } @Override public void onSuccess(T value) { downstream.onSuccess(value); } @Override public void onError(Throwable e) { downstream.onError(e); } @Override public void dispose() { DisposableHelper.dispose(this); task.dispose(); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } @Override public void run() { source.subscribe(this); } } }
当调用 scheduler.scheduleDirect 方法后切换线程,之后执行 SubscribeOnObserver 中的 run 方法调用 source.subscribe(this),从此刻开始,上游的调用逻辑将在切换后的线程中执行,而上游逻辑之后又会调用 SubscribeOnObserver 中的方法,所以上游生产者( map )和下游消费者( SingleObserver)都会运行在切换后的线程中。
思考:若 subscribeOn 和 observeOn 同时存在,如何判断各个阶段所运行的线程?
比如下面的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 Single.just("item") .map(object : Function<String, String> { override fun apply(t: String): String { println("1:${Thread.currentThread()}") return t } }) .observeOn(Schedulers.io()) .map(object : Function<String, String> { override fun apply(t: String): String { println("2:${Thread.currentThread()}") return t } }) .subscribeOn(Schedulers.newThread()) .subscribe(object : SingleObserver<String> { override fun onSuccess(t: String) { println("3:${Thread.currentThread()}") } override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } })
结果如下:
1 2 3 1:Thread[RxNewThreadScheduler-1,5,main] 2:Thread[RxCachedThreadScheduler-1,5,main] 3:Thread[RxCachedThreadScheduler-1,5,main]
结果不理解的话,就按照上面的思路去看下源码吧。
线程调度器 上文中频繁出现了 scheduler.scheduleDirect 切线程的语句,下面就来详细说明 Rxjava2 到底是如何切换线程的。
Rxjava2 常用的线程调度器一共有 4 种:
调度器类型
效果
Schedulers.computation()
用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.newThread()
为每个任务创建一个新线程
Schedulers.io()
用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的线程调度器
AndroidSchedulers.mainThread( )
主线程,UI线程,可以用于更新界面
computation、newThread 以及 io 调度器的创建有一个共同特点,就是他们的初始化工作都在 Schedulers 的静态代码块中完成,也就是说他们的实例对象是全局唯一的,Schedulers 中的静态代码块如下。
1 2 3 4 5 6 7 8 9 10 11 static { SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask()); COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); IO = RxJavaPlugins.initIoScheduler(new IOTask()); TRAMPOLINE = TrampolineScheduler.instance(); NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); }
Schedulers.newThread() 调用 Schedulers.newThread() 得到的实例对象是一个 NewThreadScheduler 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final class NewThreadScheduler extends Scheduler { final ThreadFactory threadFactory; private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler"; private static final RxThreadFactory THREAD_FACTORY; /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority"; static { int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY))); THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); } public NewThreadScheduler() { this(THREAD_FACTORY); } public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @NonNull @Override public Worker createWorker() { return new NewThreadWorker(threadFactory); } }
有文章开头两个案例的源码分析可知,在线程发生切换时会调用 scheduler.scheduleDirect语句,该语句最终会执行 Scheduler类 的 scheduleDirect 方法。
1 2 3 4 5 6 7 8 9 10 11 public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
Scheduler 的 createWorker()方法会调用子类(这里是NewThreadScheduler)的 createWorker 方法创建一个 NewThreadWorker 对象,之后调用到 NewThreadWorker 对象的 schedule 方法,最终调用到 NewThreadWorker对象的 scheduleActual 方法。
这里可以看出每次切换线程都会创建一个线程池,这块逻辑同时也是几种线程调度之间最大的区别。
下面我们看一下 NewThreadWorker 类的关键代码实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; volatile boolean disposed; public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } ... public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } ... }
构造方法中新建了一个核心线程数是 1 的线程池,具体实现如下。
1 2 3 4 5 public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
scheduleActual 方法中通过 executor.submit 或 executor.schedule 实现了线程的切换。
Schedulers.io() Schedulers.io() 最终生成 IoScheduler 对象,IoScheduler 内部维护了一个 CachedWorkerPool ,CachedWorkerPool 内部维护了一个 expiringWorkerQueue 队列用于保存可用的 ThreadWorker。每次需要 ThreadWorker 时,优先从队列中取,取不到则新建 ThreadWorker。接下来就带领大家看下这个逻辑是怎么实现的。
下面是 expiringWorkerQueue 添加 ThreadWorker 的场景。
1 2 3 4 5 6 void release(ThreadWorker threadWorker) { // Refresh expire time before putting worker back in pool threadWorker.setExpirationTime(now() + keepAliveTime); expiringWorkerQueue.offer(threadWorker); }
当调用 IoScheduler 的 dispose 方法时会调用到这个方法,它是 CachedWorkerPool 中的方法,此方法将已使用完成的 threadWorker 加入到 expiringWorkerQueue 队列的尾部以便重用。
接下来我们看下 threadWorker 的获取。
1 2 3 4 public Worker createWorker() { //通过调用 CachedWorkerPool 的 get 方法获取了 threadWorker。 return new EventLoopWorker(pool.get()); }
CachedWorkerPool 的 get 方法定义如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; }
get 方法先通过 expiringWorkerQueue.poll 方法获取 threadWorker 对象,获取不到则新建。
下面是 ThreadWorker 的定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } }
ThreadWorker 是 NewThreadWorker 的子类,之后的操作和 Schedulers.newThread() 中讲述的一致,这里不再赘述。
Schedulers.io() 采用了 pool 的方式复用了已使用完成的线程池。
Schedulers.computation() Schedulers.computation() 会生成一个 ComputationScheduler 对象,ComputationScheduler 类内部维护了一个 FixedSchedulerPool 对象,该对象中维护了一个 PoolWorker 数组(PoolWorker 是 NewThreadWorker 的子类),数组的长度是 CPU 核数,所以 Schedulers.computation() 会生成一个长度固定为 CPU 核数的线程池数组 。
数组的长度计算逻辑如下,得到的 MAX_THREADS 就是 CPU 的核数。
1 MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
再来让我们看看 createWorker 方法的实现。
1 2 3 public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); }
其中 pool.get().getEventLoop() 得到了一个 PoolWorker 对象,下面看下 getEventLoop 方法的实现。
1 2 3 4 5 6 7 8 public PoolWorker getEventLoop() { int c = cores; if (c == 0) { return SHUTDOWN_WORKER; } // simple round robin, improvements to come return eventLoops[(int)(n++ % c)]; }
这其实就是循环取数组中 PoolWorker 元素的逻辑。
AndroidSchedulers.mainThread() AndroidSchedulers.mainThread() 最终生成一个 HandlerScheduler 静态对象,HandlerScheduler 的创建和上面几个调度器的创建逻辑不一样,上面几个调度器都是在 Schedulers 的静态代码块中新建的,而 HandlerScheduler 属于 AndroidSchedulers 静态内部类 MainHolder 的一个静态属性,代码如下。
1 2 3 4 5 6 7 8 9 10 public final class AndroidSchedulers { private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false); } ... }
调度过程也有不一样,HandlerScheduler 中自己定义了 scheduleDirect 方法,所以发生线程调度后,即 scheduler.scheduleDirect 被调用后,HandlerScheduler 的 scheduleDirect 方法会被直接调用,而不需要执行父类 Scheduler 的 scheduleDirect 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); if (async) { message.setAsynchronous(true); } handler.sendMessageDelayed(message, unit.toMillis(delay)); return scheduled; }
由上面代码可知, Runnable 对象中的 run 方法会在主线程中执行。
总结 这篇文章重点讲解了 Rxjava2 的线程调度原理,总的来说,学习 Rxjava2 的源码设计思想可以从以下 4 个方面作为切入点: 1、Rxjava2 如何实现基于事件流的链式调用。 2、线程调度原理。 3、map 操作符是如何工作的。 4、Rxjava2 是如何取消订阅的。
对于以上未介绍的切入点,大家可以自行去阅读源码或参考 Hencoder plus 相关课程。
文章写作背景 笔者在看了扔物线老师的 Hencoder plus 相关课程后觉得线程调度这块的知识讲解得不完整,后面笔者花了好些时间才梳理清晰,所以记录下这篇文章以免忘记。同时,如果对你也有帮助的话,那就更好了。