Demo 分析
最简单的 demo1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24Observable.OnSubscribe sourceOnSubscribe = new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onCompleted();
}
};
Observable sourceObservable = Observable.create(sourceOnSubscribe);
Subscriber<String> targetSubscriber = new Subscriber<String>() {
public void onNext(String s) {
Log.d(tag, s);
}
public void onCompleted() {
Log.d(tag, "Completed!");
}
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
sourceObservable.subscribe(targetSubscriber);
没有进行简写,每个中间变量都赋予了名字,方便后面说明.
demo的输出结果就是 Hello Hi Completed!
基本 demo 的函数调用链
分析上述最简单的 demo, 分析中不看 部分与性能、兼容性、扩展性有关的代码和函数调用,仅关注核心代码和调用,
在相关源码里,关键部分有相关注释说明.
先上图.
- 实例化 Observable.OnSubscribe , 记为 sourceOnSubscribe.
- 调用 Observable.create 静态方法
- create()内部会实例化 Observable 对象,需要传入sourceOnSubscribe,将其返回值记为 sourceObservable.
- 实例化 Subscriber , 记为 targetSubscriber.
- 调用 sourceObservable.subscribe(targetSubscriber) 方法,这是将观察者和观察源建立联系的地方,订阅.
- sourceObservable 会首先调用 targetSubscriber.onStart() 方法.
- sourceObservable 调用 sourceOnSubscribe 的 call(targetSubscriber)方法, 就是上面我们自定义的地方,执行到我们写的代码附近了.
- 执行相关逻辑,上面 demo 中什么都没做,这里需要我们自己实现具体逻辑.
- 调用
targetSubscriber.onNext(T)
方法,这里也是上面 demo 里自己实现的地方. targetSubscriber.onCompleted()
ortargetSubscriber.onError(e)
,整个流程跑完了.
整个调用流程其实并不复杂,跟踪下来还是很容易的,RxJava 在内部也没做太多的事.
深入使用的源码分析
RxJava 最强大的就是操作符 和 线程操作,接下来看看这部分.
操作符
map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15Func1 mapFun=new Func1<Integer, String>() {
public String call(Integer number) { // 参数类型 int
return "number " + number; // 返回类型 String
}
};
Action1 action1=new Action1<String>() {
public void call(String str) { // 参数类型 String
Log.i(str);
}
};
Observable.just(1,2,3,4,5)
.map(mapFun)
.subscribe(action1);
- 调用 map(mapFun)
- map 方法内部实例化 OnSubscribeMap ,传入 this (Observable) 和 mapFun.
- 调用 Observable.create 方法,生成新的 MapObservable
- 我们调用 subscribe 时,实际上调用的是 MapObservable.subscribe().
- 回调 onStart()
- 调用 onSubscribeMap 的 call()
- 生成一个新的 mapSubscriber ,之后会订阅原来的 Observable.
- 关联两个 subscriber 的 unsubscribe()
- 用新的 mapSubscriber 订阅原来的 Observable.
- 原来的 Observable 回调 mapSubscriber的 onStart()
- 调用原来的 OnSubscribe.call()
- OnSubscribe 内部的执行逻辑
- 调用 mapSubscriber 的 onNext(T)
- mapSubscriber 会调用 mapFun.call(T) 返回 R
- mapSubscriber 调用真正的 targetSubscriber.onNext(R),R 是转换后的数据
看流程有点复杂,其实也很简单,就是 map 在观察源和观察者之前做了一层转换,当发生订阅时,观察者订阅的不是真正的观察源,
而是 map 内部的’转换观察源’,’转换观察源’内部会再去订阅真正的观察源,然后将观察源返回的数据通过转换函数mapFun
转换,
再返回给我们定义的观察者.
lift 变换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33new Func1<Integer, String>() {
public String call(Integer number) { // 参数类型 int
return "number " + number; // 返回类型 String
}
};
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
observable.lift(new Observable.Operator<String, Integer>() {
//这个参数 subscriber,就是最终的,也就是我们使用时传入的
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
return new Subscriber<Integer>() {
public void onNext(Integer integer) {
subscriber.onNext("number " + integer);
}
public void onCompleted() {
}
public void onError(Throwable e) {
}
};
}
}).subscribe(new Action1<String>() {
public void call(String s) {
Log.d(s);
}
});这个例子功能和上面 map 的例子相同,都是将数字变为字符串.
整体流程和 map 类似,都是生成一个中间 Subscriber ,去订阅原来的 Observable,
然后在 onNext 等方法里将数据处理转换之后,回调真正的 Subscriber.
线程控制
- subscribeOn()
`OperatorSubscribeOn` 新建了一个 OnSubscribe,执行 call() 即产生事件.即 OperatorSubscribeOn,内部调用了线程相关.
- observeOn()
使用了 lift 操作符, operator 是 `OperatorObserveOn`.内部也是线程相关代码.