RxJava Observer Observable(Not With Thread Dispatch) Source Code

众所周知 RxJava 是基于观察者模式的响应式编程框架。其中主要有2个主要对象:

  • Observable 被观察者
  • Observer 观察者

不带线程切换的基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onComplete();
}
});

Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer o) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};
observable.subscribe(observer);

好了,一般创建被观察者都是使用静态方法 Observable.create 传递一个 ObservableOnSubscribe 对象。

来,看 Observable 的 create 方法:

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

查看参数 ObservableOnSubscribe 类:

1
2
3
4
5
6
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

它就是一个接口,有一个 subscribe 方法需要具体实现。
再看 RxJavaPlugins.onAssembly (onAssembly 上面栗子的情况下,没有特殊作用,传什么对象,返回什么对象):

1
2
3
4
5
6
7
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

看 onAssembly 的参数, ObservableCreate 类:

1
2
3
4
5
6
7
8
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
....
}

它继承自 Observable ,并通过构造函数将 ObservableOnSubscribe 保存为自己的属性。
所以在创建 Observable 对象的过程中,进行了1次包装,最终返回了一个 ObservableCreate 被观察者对象:

RxJava 被观察者创建时序

看 Observable 的 subscribe 方法

主要代码如下:

1
2
3
4
5
6
7
8
9
10
public final void subscribe(Observer<? super T> observer) {    
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
}
}

subscribe 方法需要接收一个 Observer 对象,看下 Observer

1
2
3
4
5
6
7
8
9
10
public interface Observer<T> {

void onSubscribe(@NonNull Disposable d);

void onNext(@NonNull T t);

void onError(@NonNull Throwable e);

void onComplete();
}

是个接口,观察者实例需要实现这几个方法。

然后继续 subscribe 方法:

RxJavaPlugins.onSubscribe(this, observer):

1
2
3
4
5
6
7
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}

没有做特别的,直接返回传进去的 Observer。

接着subscribeActual:

1
protected abstract void subscribeActual(Observer<? super T> observer);

唉,需要子类实现。回顾上面 Observable.create 的过程,我们创建的 Observable 实际是 ObservableCreate 对象,所以到 ObservableCreate 里看 subscribeActual方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

里面调用了 observer.onSubscribe(parent) 方法。 parent 是一个 CreateEmitter 对象。

看下 CreateEmitter 的源码,发现它实现了 ObservableEmitter 接口和 Disposable 接口,所以它可以多态转为 ObservableEmitterDisposable 实例对象。

继续, observer.onSubscribe(parent) 就是回调观察者的 onSubscribe,并将 CreateEmitter 转为 Disposable 对象传递。

然后 source.subscribe(parent) 是调用被观察者的 subscribe 方法,并将 CreateEmitter 转为 ObservableEmitter 对象传递。

从上面知道 source 就是 ObservableCreate 类保存的 ObservableOnSubscribe 对象,是最上面栗子里创建的被观察者,所以source.subscribe(parent) 就走到了栗子里的代码:

1
2
3
4
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onComplete();
}

这里我调用了 emitter 的 onNextonComplete 方法。看下 CreateEmitter 具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

private static final long serialVersionUID = -3434801548987643227L;

final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
if (t == null) {
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError ...");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}

@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}

@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}

@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}

原来 emitter 的 onNext 内会直接调用 观察者的 onNext, emitter 的 onComplete 内直接调用观察者的 onComplete。这里没有涉及线程的切换,所以所有操作都是在同一个线程进行。CreateEmitter 还有其他几个方法:

  • onError(Throwable t) 在 emitter 发出 error 事件时调用
  • dispose() 取消这次订阅,在 onNext onError onComplete 中都会先判断是否已经 dispose,如果 dispose 了,就不会回调观察者的对应方法。

RxJava 订阅时序

总结

在不使用 RxJava 的线程切换功能下分析了被观察者的创建和订阅观察者的流程。 主要涉及了 ObservableObservableOnSubscribeObservableCreateObserver(接口)、CreateEmitter。当被观察者的 subscribe 调用之后,就会走到 ObservableOnSubscribe 的 subscribe,从而触发 emitter 发送事件,emitter 发送事件之后,又由 emitter 调用观察者的对应事件回调方法。

赏杯咖啡 🍵 Donate