RxJava中多種場景的實現總結
一、推遲執(zhí)行動作
可以使用timer+map方法實現.代碼如下:
Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
return doSomething();
}).subscribe(System.out::println);
}
二、推遲發(fā)送執(zhí)行的結果
這種場景要求產生數據的動作是馬上執(zhí)行,但是結果推遲發(fā)送.這和上面場景的是不一樣的.
這種場景可以使用Observable.zip來實現.
zip操作符將多個Observable發(fā)射的數據按順序組合起來,每個數據只能組合一次,而且都是有序的。最終組合的數據的數量由發(fā)射數據最少的Observable來決定。
對于各個observable相同位置的數據,需要相互等待,也就說,第一個observable第一個位置的數據產生后,要等待第二個observable第一個位置的數據產生,等各個Observable相同位置的數據都產生后,才能按指定規(guī)則進行組合.這真是我們要利用的.
zip有很多種聲明,但大致上是一樣的,就是傳入幾個observable,然后指定一個規(guī)則,對每個observable對應位置的數據進行處理,產生一個新的數據, 下面是其中一個最簡單的:
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
用zip實現推送發(fā)送執(zhí)行結果如下:
Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
,Observable.just(doSomething()), (x,y)->y)
.subscribe(System.out::println));
三、使用defer在指定線程里執(zhí)行某種動作
如下面的代碼,雖然我們指定了線程的運行方式,但是doSomething()這個函數還是在當前代碼調用的線程中執(zhí)行的.
Observable.just(doSomething())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->Utils.printlnWithThread(v.toString()););
通常我們采用下面的方法達到目的:
Observable.create(s->{s.onNext(doSomething());})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{
Utils.printlnWithThread(v.toString());
});
但其實我們采用defer也能達到相同的目的.
關于defer
defer 操作符與create、just、from等操作符一樣,是創(chuàng)建類操作符,不過所有與該操作符相關的數據都是在訂閱是才生效的。
聲明:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
defer的Func0里的Observable是在訂閱(subscribe)的時候才創(chuàng)建的.
作用:
Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.
也就說observable是在訂閱的時候才創(chuàng)建的.
上面的問題用defer實現:
Observable.defer(()->Observable.just(doSomething()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{Utils.printlnWithThread(v.toString());
});
四、使用compose不要打斷鏈式結構
我們經常看到下面的代碼:
Observable.just(doSomething())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{Utils.printlnWithThread(v.toString());
上面的代碼中,subscribeOn(xxx).observeOn(xxx)可能在很多地方都是一樣的, 如果我們打算把它統一在某一個地方實現, 我們可以這么寫:
private static <T> Observable<T> applySchedulers(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
但是這樣每次我們需要調用上面的方法, 大致會像下面這樣,最外面是一個函數,等于打破了鏈接結構:
applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
@Override public Data call(Data data) {
return manipulate(data);
}
})
).subscribe(new Action1<Data>() {
@Override public void call(Data data) {
doSomething(data);
}
});
可以使用compose操作符達到不打破鏈接結構的目的.
compose的申明如下:
public Observable compose(Transformer<? super T, ? extends R> transformer);
它的入參是一個Transformer接口,輸出是一個Observable. 而Transformer實際上就是一個Func1<Observable<T>, Observable<R>> ,換言之就是:可以通過它將一種類型的Observable轉換成另一種類型的Observable.
簡單的說,compose可以通過指定的轉化方式(輸入參數transformer),將原來的observable轉化為另外一種Observable.
通過compose, 采用下面方式指定線程方式:
private static <T> Transformer<T, T> applySchedulers() {
return new Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
};
}
Observable.just(doSomething()).compose(applySchedulers())
.subscribe(v->{Utils.printlnWithThread(v.toString());
});
函數applySchedulers可以使用lambda表達式進一步簡化為下面為:
private static <T> Transformer<T, T> applySchedulers() {
return observable->observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
五、按優(yōu)先級使用不同的執(zhí)行結果
上面這個標題估計沒表達清楚我想表達的場景. 其實我想表達的場景類似于平常的獲取網絡數據場景:如果緩存有,從緩存獲取,如果沒有,再從網絡獲取.
這里要求,如果緩存有,不會做從網絡獲取數據的動作.
這個可以采用concat+first實現.
concat將幾個Observable合并成一個Observable,返回最終的一個Observable. 而那些數據就像從一個Observable發(fā)出來一樣. 參數可以是多個Observable,也可以是包含Observalbe的Iterator.
新的observable內的數據排列按原來concat里的observable順序排列,即新結果內的數據是按原來的順序排序的.
下面是上述需求的實現:
Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
.subscribe(v->System.out.println("result:"+v));
//從緩存獲取數據
private static Observable<String> getDataFromCache(){
return Observable.create(s -> {
//dosomething to get data
int value = new Random().nextInt();
value = value%2;
if (value!=0){
s.onNext("data from cache:"+value); //產生數據
}
//s.onError(new Throwable("none"));
s.onCompleted();
}
);
}
//從網絡獲取數據
private static Observable<String> getDataFromNetwork(){
return Observable.create(s -> {
for (int i = 0; i < 10; i++) {
Utils.println("obs2 generate "+i);
s.onNext("data from network:" + i); //產生數據
}
s.onCompleted();
}
);
}
上面的實現,如果getDataFromCache有數據, getDataFromNetwork這里的代碼是不會執(zhí)行的, 這正是我們想要的.
上面實現有幾個需要注意:
1、有可能從兩個地方都獲取不到數據, 這種場景下使用first會拋出異常NoSuchElementException,如果是這樣的場景,需要用firstOrDefault替換上面的first.
2、上面getDataFromCache()里,如果沒有數據,我們直接調用onCompleted,如果不調用onCompleted,而是調用onError,則上述采用concat是得不到任何結果的.因為concat在收到任何一個error,合并就會停止.所以,如果要用onError, 則需要用concatDelayError替代concat.concatDelayError會先忽略error,將error推遲到最后在處理.
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流。
相關文章
SpringMVC中@RequestMapping注解的實現
RequestMapping是一個用來處理請求地址映射的注解,本文主要介紹了SpringMVC中@RequestMapping注解的實現,具有一定的參考價值,感興趣的可以了解一下2024-01-01
springboot2.x解決運行順序及Bean對象注入順序的問題
這篇文章主要介紹了springboot2.x解決運行順序及Bean對象注入順序的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01

