RxJava2 線程調(diào)度的方法
subscribeOn和observeOn負責(zé)線程切換,同時某些操作符也默認指定了線程.
我們這里不分析在線程中怎么執(zhí)行的.只看如何切換到某個指定線程.
subscribeOn
Observable.subscribeOn()在方法內(nèi)部生成了一個ObservableSubscribeOn對象.
主要看一下ObservableSubscribeOn的subscribeActual方法.
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//調(diào)用下游的Observer的onSubscribe方法
observer.onSubscribe(parent);
//通過SubscribeTask執(zhí)行了上游Observable的subscribeActual方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
scheduler.scheduleDirect(Runnable)用于執(zhí)行SubscribeTask這個任務(wù).SubscribeTask本身是Runnable的實現(xiàn)類.看一下其run方法.
@Override
public void run() {
//上游的Observable.subscribe方法被切換到了新的線程
source.subscribe(parent);
}
首先可以得出結(jié)論:subscribeOn將上游的Observable的subscribe方法切換到了新的線程.
如果多次調(diào)用subscribeOn切換線程,會有什么效果?
由下往上,每次調(diào)用subscribeOn,都會導(dǎo)致上游的Observable的subscribeActual切換到指定的線程.那么最后一次調(diào)用的切換最上游的創(chuàng)建型操作符的subscribeActual的執(zhí)行線程.如果操作符有默認執(zhí)行線程怎么辦?
操作符默認線程
如果是創(chuàng)建型操作符,處于最上游,那么subscribeOn的線程切換對它不起作用.天高皇帝遠,縣官不如現(xiàn)管.就是這個道理.
如果是其它操作符,會是怎樣的?
以操作符timeout為例:它對應(yīng)ObservableTimeoutTimed和TimeoutObserver
@Override
public void onNext(T t) {
downstream.onNext(t);
//超時計時
startTimeout(idx + 1);
}
void startTimeout(long nextIndex) {
//交給操作符默認的線程執(zhí)行
task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
}
@Override
public void onTimeout(long idx) {
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
}
//TimeoutTask.java
static final class TimeoutTask implements Runnable {
@Override
public void run() {
parent.onTimeout(idx);
}
}
可以看到操作符默認的執(zhí)行線程只用來做超時計時任務(wù),如果超時了,會在操作符的默認線程執(zhí)行onError方法..操作符默認線程對下游的observer造成什么影響要做具體對待.
observeOn
observeOn對應(yīng)ObservableObserveOn和ObserveOnObserver.
//ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//ObserveOnObserver.java
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
if (d instanceof QueueDisposable) {
if (m == QueueDisposable.SYNC) {
//執(zhí)行下游Observer的onSubscribe方法
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
//執(zhí)行下游Observer的onSubscribe方法
downstream.onSubscribe(this);
return;
}
}
//執(zhí)行下游Observer的onSubscribe方法
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
//省略
schedule();
}
@Override
public void onError(Throwable t) {
//省略
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
/*
ObserveOnObserver是Runnable的實現(xiàn)類.交給線程池執(zhí)行
*/
worker.schedule(this);
}
}
void drainNormal() {
final Observer<? super T> a = downstream;
for (;;) {
for (;;) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
a.onError(ex);
return;
}
//執(zhí)行下游Observer的onNext方法
a.onNext(v);
}
}
}
void drainFused() {
for (;;) {
if (!delayError && d && ex != null) {
//執(zhí)行下游Observer的onError方法
downstream.onError(error);
return;
}
downstream.onNext(null);
if (d) {
ex = error;
if (ex != null) {
//執(zhí)行下游Observer的onError方法
downstream.onError(ex);
} else {
//執(zhí)行下游Observer的onComplete方法
downstream.onComplete();
}
return;
}
}
}
//執(zhí)行線程任務(wù)
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
從上面可以看出ObservableObserveOn在其subscribeActual方法中并沒有切換上游Observable的subscribe方法的執(zhí)行線程.但是ObserveOnObserver在其onNext,onError和onComplete中通過schedule()方法將下游Observer的各個方法切換到了新的線程.
得出結(jié)論: observeOn負責(zé)切換的是下游Observer的各個方法的執(zhí)行線程
如果下游多次通過observeOn切換線程,會有什么效果?
每次切換都會對其下游造成影響,直到遇到下一個observeOn為止.
Observer(onSubscribe,onNext,onError,onComplete)
onNext,onError,onComplete與上游最近的observeOn所切換的線程保持一致.onSubscribe則不同.
遇到線程切換的時候,會首先在對應(yīng)的Observable的subscribeActual方法內(nèi),先調(diào)用observer.onSubscribe方法.而observer.onSubscribe會逐級向上傳遞直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法內(nèi)調(diào)用,這是在主線程執(zhí)行的.所以onSubscribe方法無論如何都是在主線程執(zhí)行.
doOnSubscribe
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
}
})
我們要看的是方法accept的執(zhí)行線程.
通過源碼找到對應(yīng)的DisposableLambdaObserver.
@Override
public void onSubscribe(Disposable d) {
//在這里調(diào)用了accept方法.
onSubscribe.accept(d);
}
這就要看上游在哪個線程執(zhí)行了Observer.onSubscribe(disposable)方法.
在創(chuàng)建型操作符的subscribeActual方法和subscribeOn對應(yīng)的Observable的subscribeActual方法內(nèi)調(diào)用了Observer.onSubscribe(disposable)方法.那么這兩處的執(zhí)行線程就決定了onSubscribe.accept(d);的執(zhí)行線程.
doFinally
對應(yīng)ObservableDoFinally和DoFinallyObserver
//DoFinallyObserver.java
@Override
public void onError(Throwable t) {
runFinally();
}
@Override
public void onComplete() {
runFinally();
}
@Override
public void dispose() {
runFinally();
}
void runFinally() {
onFinally.run();
}
可以看到與它所對應(yīng)的DoFinallyObserver的onError,onComplete,dispose方法的執(zhí)行線程有關(guān),這三個方法的執(zhí)行線程又受到上游的observeOn的影響.如果沒有observeOn,則會受到最上游的observable.subscribeActual方法影響.
doOnError
對應(yīng)ObservableDoOnEach和DoOnEachObserver
//DoOnEachObserver.java
@Override
public void onError(Throwable t) {
onError.accept(t);
}
和自身對應(yīng)的observer.onError所在線程保持一致.
doOnNext
對應(yīng)ObservableDoOnEach和DoOnEachObserver
//DoOnEachObserver.java
@Override
public void onNext(T t) {
onNext.accept(t);
}
和自身對應(yīng)的observer.onNext所在線程保持一致.
操作符對應(yīng)方法參數(shù)的執(zhí)行線程
包io.reactivex.functions下的接口類一般用于處理上游數(shù)據(jù)然后往下傳遞.這些接口類的方法一般在對應(yīng)的observer.onNext中調(diào)用.所以他們的線程保持一致.
總結(jié):
subscribeOn由下往上逐級切換Observable.subscribe的執(zhí)行線程,不受observeOn影響,也不受具有默認指定線程的非創(chuàng)建型操作符影響,但是會被更上游的subscribeOn奪取線程切換的權(quán)利,直到最上游.如果最上游的創(chuàng)建型操作符也有默認執(zhí)行線程,那么任何一個subscribeOn的線程切換不起作用.subscribeOn由下向上到達最上游后,然后由上往下影響下游的observer的執(zhí)行線程.遇到observeOn會被奪取線程切換的權(quán)利.observeOn影響的是下游的observer的執(zhí)行線程,由上往下,遇到另一個observeOn會移交線程控制權(quán)力,遇到指定默認線程非創(chuàng)建型的操作符,要視具體情況對待.
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Android開發(fā)之獲取SD卡及手機ROM容量的方法
這篇文章主要介紹了Android開發(fā)之獲取SD卡及手機ROM容量的方法,結(jié)合實例形式分析了Android針對SD卡的讀取及屬性操作相關(guān)技巧,需要的朋友可以參考下2016-04-04
分享Android中ExpandableListView控件使用教程
這篇文章主要介紹了Android中ExpandableListView控件使用教程,可以實現(xiàn)二級列表展示效果,需要的朋友可以參考下2015-12-12
Android實現(xiàn)可輸入數(shù)據(jù)的彈出框
這篇文章主要為大家詳細介紹了Android實現(xiàn)可輸入數(shù)據(jù)的彈出框,文章提供了兩種方式,示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-01-01
android指定DatePickerDialog樣式并不顯示年的實現(xiàn)代碼
下面小編就為大家?guī)硪黄猘ndroid指定DatePickerDialog樣式并不顯示年的實現(xiàn)代碼。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,祝大家游戲愉快哦2016-08-08

