RxJava's Thread Dispatch

简单的使用 RxJava,并结合线程切换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) {
Log.d("TAG", "[subscribe]" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidScheduler.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG", "[onSubscribe]" + Thread.currentThread().getName());
}
@Override
public void onNext(Object o) {
Log.d("TAG", "[onNext]" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
Log.d("TAG", "[onComplete]" + Thread.currentThread().getName());
}
});

上次已经分析过不带线程的订阅和观察过程,现在直接从 subscribeOnobserverOn 开始分析。由上次得知 Observable.create 创建最后获得的是 ObservableCreate 被观察者对象。

ObservableCreate

先分析 Observable.subscribeOn

1
2
3
4
5
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 这里的 this 是上层的 ObservableCreate 对象,又添加一层封装,返回 ObservableSubscribeOn 对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

看下 ObservableSubscribeOn 构造函数:

1
2
3
4
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

就是保存下 source 和被观察者的 scheduler

到这里,最初的 ObservableOnSubscribe 已经被包装了3层:

ObservableSubscribeOn

然后到执行订阅的地方,事件的发出都是在订阅之后,所以来到 Observable.subscribe 方法,上篇也分析了最终会走到当前被观察者的 subscribeActual 方法,现在的被观察者被包装为了 ObservableSubscribeOn
所以直接到 ObservableSubscribeOn 的 subscribeActual 中:

1
2
3
4
5
6
7
8
public void subscribeActual(final Observer<? super T> s) {
// 将观察者包装一层
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 调用观察者的 onSubscribe,这里还没有进行线程切换,所以是发生在当前 Observable 被创建的线程
s.onSubscribe(parent);
// 创建一个 SubscribeTask,然后由线程调度器调度执行。这里进行了线程的切换工作
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

这里先将我们自己的观察者 Observer 包装为了 SubscribeOnObserver 对象。

SubscribeOnObserver

然后重要的是创建了一个 SubscribeTask,接着调用 scheduler.scheduDirect 执行。

先看下 SubscribeTask 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
// SubscribeTask 是 ObservableSubscribeOn 的内部类, 所以能直接访问 source 对象
source.subscribe(parent);
}
}

SubscribeTask 实现了 Runnable,并在 run() 中调用了被观察者的 Observeable.subscribe(),从而执行我们自己的事件发送代码。

再看 scheduler.scheduDirect() 方法:

1
2
3
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

往下:

1
2
3
4
5
6
7
8
9
10
11
12
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 创建 worker
final Worker w = createWorker();
// 将传入的 Runnable 封装一层,实际还是传入的 Runnable
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将 worker 和 runnable 包装为 DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
// 调用 worker 执行
w.schedule(task, delay, unit);

return task;
}

Scheduler 是个抽象类,createWorker 需要子类实现,就选常用的 IoSchedulercreateWorker查看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final AtomicReference<CachedWorkerPool> pool;

public Worker createWorker() {
// 创建一个 EventLoopWorker,并传入一个 Worker 缓存池
return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;

final AtomicBoolean once = new AtomicBoolean();

EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 从缓存中获取一个 Woker
this.threadWorker = pool.get();
}

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//...
// Runnable 交给 threadWorker 去执行,这里的 Runnable 的 run() 方法中执行的就是我们在被观察者发送事件的代码
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

看下 CachedWorkerPool Worker 缓存池的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class CachedWorkerPool implements Runnable {

//...

ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
// 如果缓存池不空,就取一个 threadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// 如果空的,就创建一个新的
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}

}

继续看 threadWorker.schedulerActual 实现,ThreadWorker没有实现这个方法,看下它的父类 NewThreadWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
public NewThreadWorker(ThreadFactory threadFactory) {
// 构造时获取一个 ScheduledExecutorService
executor = SchedulerPoolFactory.create(threadFactory);
}

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// 还是传入的 runnable 对象
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将 decoratedRun 包装为一个新的 runnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

Future<?> f;
try {
if (delayTime <= 0) {
// 线程池中立即执行
f = executor.submit((Callable<Object>)sr);
} else {
// 延迟执行
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
} catch (RejectedExecutionException ex) {
//...
}

return sr;
}
}

到这里,SubscribeTaskrun() 方法最终会在线程池中被执行,也就是我们在 subscribe 方法中写的发送事件的代码在这里执行。

observerOn 操作

上面的栗子中,是这样的:

1
2
// 指定观察者在 Android Main Thread 接受事件结果
.observeOn(AndroidScheduler.mainThread())

Observable类的 observeOn()方法:

1
2
3
4
5
6
7
8
9
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//...
// 将自身包装为新的被观察者对象,因为进行 subscribeOn 时也包装了一层,所以现在一共4层了
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn

ObservableObserveOn 类:

1
2
3
4
5
6
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

就是存一些属性。

然后就是一样的操作,直接来到订阅的地方,ObservableObserveOnsubscribeActual

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
// 如果是当前线程,直接执行上一层的 subsribe
// 执行最里面的 ObservableSubscribeOn 的 subscribe() 方法
source.subscribe(observer);
} else {
// 创建 worker,栗子中的 scheduler 是 AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
// 执行subscribe(),这里是在上面提到的 SubscribeTask 的 run() 中执行
// 将观察者包装为 ObserveOnObserver
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

加上上面的 subscribeOn 操作,观察者已经被包装了2层:

ObserveOnObserver

source.subscribe() 中将会把事件通过 onNext onError onComplete 方法发送出去,所以看下 ObserveOnObserveronNext

1
2
3
4
5
6
7
8
public void onNext(T t) {
if (sourceMode != QueueDisposable.ASYNC) {
// 将结果存入队列
queue.offer(t);
}
// 调用 schedule()
schedule();
}

ObserveOnObserver 的 schdule 方法:

1
2
3
4
5
6
7
8
void schedule() {
if (getAndIncrement() == 0) {
// ObserveOnObserver 实现了 Runnable 接口,所以把自己交给 worker 去执行
// 这里的 worker 由 Android MainThread Schduler 提供,它实际是通过向 Android MainThread 的 Looper 发送 Message 实现的线程切换。
// 构造 callback 为 this 的 Message 发送到主线程,主线程消费这条消息时,就执行 callback 的 run() 方法,即这里 this 对象 ObserveOnObserver 的 run() 方法 。
worker.schedule(this);
}
}

ObserveOnObserver 实现了 Runnable 接口,所以它的 run() 方法将会在主线程调用。

ObserveOnObserver 的 run() 方法:

1
2
3
4
5
6
7
8
public void run() {
// outputFused 默认为 false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

ObserveOnObserver 的 drainNormal() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void drainNormal() {

// 存消息的队列
final SimpleQueue<T> q = queue;
// 上层观察者,SubscribeOnObserver
final Observer<? super T> a = actual;

for (;;) {
for (;;) {

T v;
try {
// 从队列取消息
v = q.poll();
} catch (Throwable ex) {
//...
return;
}
//...
// 调用 SubscribeOnObserver 的 onNext()
a.onNext(v);
}
//...
}
}

SubscribeOnObserver 的 onNext 没做特别的事情,就是调用原始观察者的 onNext:

1
2
3
4
5
6
7
8
9
10
11
12
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}

@Override
public void onNext(T t) {
actual.onNext(t);
}
}

因此 Observer 的 onNext() 就会在 Android 的主线程执行了。其他的 onError onCompleteonNext 类似。

赏杯咖啡 🍵 Donate