概述
传统的AsyncTesk在业务逻辑不是很复制的时候会使用起来很方便,随着业务的变更,业务变得臃肿,代码也会出现冗余。这时候RxJava可以更好的解决这一问题。RxJava是一个实现异步操作的库,当初我们使是用AsyncTesk来进行异步交互的,现在RxJava是完完全全可以替代AsyncTesk的一种框架,当项目或逻辑越来越复杂时,它依旧能保持代码的可读性性,整洁性等。关于RxJava一个很重要的点就是响应式编程,响应式编程就是编程处理异步数据流。就是我们接收连续流动的数据–数据流–提供处理数据流的方法并将该方法应用到数据流。想象一下高速公路上汽车过收费站,公路就是流,汽车是事件(不断的行走),而收费站时接受事件的(不断的观察车辆)。此版本主要针对于RxJava2.X,假如对1.0版本不太熟悉也没关系,不影响2.0的使用。
成员介绍:
Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;
Observer:接收源,英文释义“观察者”,就是观察者模式中的“观察者”,可接收Observable发射的数据;
Consumer:也是接收源,它跟Observer的区别是,Consumer只关心数据的结果,不关心它是否出错Error,和完成情况Complete;
Subscriber:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;
ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
基本用法:
学习RxJava其实就是学习该框架操作符的使用。里面有很多操作符十分的酷炫。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32//创建一个被观察者 Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//创建一个观察者 Observer
Observer<Integer> observer = new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
public void onError(Throwable e) {
Log.d(TAG, "error");
}
public void onComplete() {
Log.d(TAG, "complete");
}
};
//调用subscribe()观察者才与被观察者建立连接
observable.subscribe(observer);
//注意:建立连接后才会开始发送数据
运行结果:1
2
3
4
5D/TAG: subscribe
D/TAG: 1
D/TAG: 2
D/TAG: 3
D/TAG: complete
这里也可以连起来写1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
public void onError(Throwable e) {
Log.d(TAG, "error");
}
public void onComplete() {
Log.d(TAG, "complete");
}
});
以上两种写法是一样的。
complete的作用:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "emit 4");
emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
private int i;
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
mDisposable = d;
}
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
i++;
if (i == 2) {//当等于2时调用dispose()
Log.d(TAG, "dispose");
mDisposable.dispose();//调用dispose()被观察者仍然会继续发送剩余的事件.
Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
}
}
public void onError(Throwable e) {
Log.d(TAG, "error");
}
public void onComplete() {
Log.d(TAG, "complete");
}
});
调用dispose()被观察者仍然会继续发送剩余的事件.
我们让被观察者依次发送1,2,3,complete,4,被观察者因为在2的时候dispose,所以接受不到3,4。但不影响被观察者继续发送数据。
运行结果:1
2
3
4
5
6
7
8
9
10D/TAG: subscribe
D/TAG: emit 1
D/TAG: onNext: 1
D/TAG: emit 2
D/TAG: onNext: 2
D/TAG: dispose
D/TAG: isDisposed : true
D/TAG: emit 3
D/TAG: emit complete
D/TAG: emit 4
以上我们简单的了解到了Rxjava的基本用法,我们知道RxJava的精髓在于它的异步操作。上面的事例全部都是在主线程中运行的。接下来看看如何实现线程的调度,实现异步操作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())//子线程中进行耗时请求
.observeOn(AndroidSchedulers.mainThread())//结果在主线程中进行处理
.subscribe(consumer);
}
可以看出线程调度十分的方便,当逻辑越来越复杂的时候,更能体现其简洁的优点。
操作符
初次之外,还有很多的操作符,我们就最常用的操作符来分析一下:
Map:是RxJava中最简单的一个变换操作符了,可以类似理解为当从接口获取Json字符串然后通过Gson转成对象bean。Map就是转换的作用
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
FlatMap:将一个发送事件的上游Observable变换为多个发送事件的Observables
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
输出结果:1
2
3
4
5
6
7
8
9D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2
可以看出flatMap是无序的。
假如需要结果为有序的话,则使用concatMap。使用方法相同。这里就不加演示了
delay
延迟1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) // 延迟3s再发送,由于使用类似,所以此处不作全部展示
.subscribe(new Observer<Integer>() {
public void onSubscribe(Disposable d) {
}
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
retry
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
*** repeatWhen
重复发送,轮询1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
// 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据
public ObservableSource<?> apply( Observable<Object> objectObservable) throws Exception {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable
// 此处有2种情况:
// 1. 若新被观察者(Observable)返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable
// 2. 若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
public ObservableSource<?> apply( Object throwable) throws Exception {
// 情况1:若新被观察者(Observable)返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable
return Observable.empty();
// Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()
// return Observable.error(new Throwable("不再重新订阅事件"));
// 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
// 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
// return Observable.just(1);
// 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件
}
});
}
}).subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应:" + e.toString());
}
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
总结
以上就是Rxjava的基本用法,都是简单的示范,Rxjava还有更复杂的一些用法,将来有机会再说。后续会尝试进行Rxjava+OkHtto+Retrofi的网络框架封装。