使用RxJava2怎么實現線程調度?很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
創新互聯專注于民豐企業網站建設,成都響應式網站建設公司,商城網站定制開發。民豐網站建設公司,為民豐等地區提供建站服務。全流程按需設計網站,專業設計,全程項目跟蹤,創新互聯專業和態度為您提供的服務
subscribeOn
Observable.subscribeOn()在方法內部生成了一個ObservableSubscribeOn對象.
主要看一下ObservableSubscribeOn的subscribeActual方法.
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); //調用下游的Observer的onSubscribe方法 observer.onSubscribe(parent); //通過SubscribeTask執行了上游Observable的subscribeActual方法 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
scheduler.scheduleDirect(Runnable)用于執行SubscribeTask這個任務.SubscribeTask本身是Runnable的實現類.看一下其run方法.
@Override public void run() { //上游的Observable.subscribe方法被切換到了新的線程 source.subscribe(parent); }
首先可以得出結論:subscribeOn將上游的Observable的subscribe方法切換到了新的線程.
如果多次調用subscribeOn切換線程,會有什么效果?
由下往上,每次調用subscribeOn,都會導致上游的Observable的subscribeActual切換到指定的線程.那么最后一次調用的切換最上游的創建型操作符的subscribeActual的執行線程.如果操作符有默認執行線程怎么辦?
操作符默認線程
如果是創建型操作符,處于最上游,那么subscribeOn的線程切換對它不起作用.天高皇帝遠,縣官不如現管.就是這個道理.
如果是其它操作符,會是怎樣的?
以操作符timeout為例:它對應ObservableTimeoutTimed和TimeoutObserver
@Override public void onNext(T t) { downstream.onNext(t); //超時計時 startTimeout(idx + 1); } void startTimeout(long nextIndex) { //交給操作符默認的線程執行 task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { downstream.onError(t); } @Override public void onComplete() { downstream.onComplete(); } } @Override public void onTimeout(long idx) { downstream.onError(new TimeoutException(timeoutMessage(timeout, unit))); }
//TimeoutTask.java static final class TimeoutTask implements Runnable { @Override public void run() { parent.onTimeout(idx); } }
可以看到操作符默認的執行線程只用來做超時計時任務,如果超時了,會在操作符的默認線程執行onError方法..操作符默認線程對下游的observer造成什么影響要做具體對待.
observeOn
observeOn對應ObservableObserveOn
和ObserveOnObserver
.
//ObservableObserveOn.java @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
//ObserveOnObserver.java @Override public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { if (d instanceof QueueDisposable) { if (m == QueueDisposable.SYNC) { //執行下游Observer的onSubscribe方法 downstream.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { //執行下游Observer的onSubscribe方法 downstream.onSubscribe(this); return; } } //執行下游Observer的onSubscribe方法 downstream.onSubscribe(this); } } @Override public void onNext(T t) { //省略 schedule(); } @Override public void onError(Throwable t) { //省略 schedule(); } void schedule() { if (getAndIncrement() == 0) { /* ObserveOnObserver是Runnable的實現類.交給線程池執行 */ worker.schedule(this); } } void drainNormal() { final Observer<? super T> a = downstream; for (;;) { for (;;) { T v; try { v = q.poll(); } catch (Throwable ex) { a.onError(ex); return; } //執行下游Observer的onNext方法 a.onNext(v); } } } void drainFused() { for (;;) { if (!delayError && d && ex != null) { //執行下游Observer的onError方法 downstream.onError(error); return; } downstream.onNext(null); if (d) { ex = error; if (ex != null) { //執行下游Observer的onError方法 downstream.onError(ex); } else { //執行下游Observer的onComplete方法 downstream.onComplete(); } return; } } } //執行線程任務 @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
從上面可以看出ObservableObserveOn在其subscribeActual方法中并沒有切換上游Observable的subscribe方法的執行線程.但是ObserveOnObserver在其onNext,onError和onComplete中通過schedule()方法將下游Observer的各個方法切換到了新的線程.
得出結論: observeOn負責切換的是下游Observer的各個方法的執行線程
如果下游多次通過observeOn切換線程,會有什么效果?
每次切換都會對其下游造成影響,直到遇到下一個observeOn為止.
Observer(onSubscribe,onNext,onError,onComplete)
onNext,onError,onComplete與上游最近的observeOn所切換的線程保持一致.onSubscribe則不同.
遇到線程切換的時候,會首先在對應的Observable的subscribeActual方法內,先調用observer.onSubscribe方法.而observer.onSubscribe會逐級向上傳遞直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法內調用,這是在主線程執行的.所以onSubscribe方法無論如何都是在主線程執行.
doOnSubscribe
.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { } })
我們要看的是方法accept的執行線程.
通過源碼找到對應的DisposableLambdaObserver.
@Override public void onSubscribe(Disposable d) { //在這里調用了accept方法. onSubscribe.accept(d); }
這就要看上游在哪個線程執行了Observer.onSubscribe(disposable)方法.
在創建型操作符的subscribeActual方法和subscribeOn對應的Observable的subscribeActual方法內調用了Observer.onSubscribe(disposable)方法.那么這兩處的執行線程就決定了onSubscribe.accept(d);的執行線程.
doFinally
對應ObservableDoFinally和DoFinallyObserver
//DoFinallyObserver.java @Override public void onError(Throwable t) { runFinally(); } @Override public void onComplete() { runFinally(); } @Override public void dispose() { runFinally(); } void runFinally() { onFinally.run(); }
可以看到與它所對應的DoFinallyObserver的onError,onComplete,dispose方法的執行線程有關,這三個方法的執行線程又受到上游的observeOn的影響.如果沒有observeOn,則會受到最上游的observable.subscribeActual方法影響.
doOnError
對應ObservableDoOnEach和DoOnEachObserver
//DoOnEachObserver.java @Override public void onError(Throwable t) { onError.accept(t); }
和自身對應的observer.onError所在線程保持一致.
doOnNext
對應ObservableDoOnEach和DoOnEachObserver
//DoOnEachObserver.java @Override public void onNext(T t) { onNext.accept(t); }
和自身對應的observer.onNext所在線程保持一致.
操作符對應方法參數的執行線程
包io.reactivex.functions下的接口類一般用于處理上游數據然后往下傳遞.這些接口類的方法一般在對應的observer.onNext中調用.所以他們的線程保持一致.
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注創新互聯行業資訊頻道,感謝您對創新互聯的支持。
網站題目:使用RxJava2怎么實現線程調度
標題鏈接:http://vcdvsql.cn/article42/gjoeec.html
成都網站建設公司_創新互聯,為您提供外貿建站、建站公司、品牌網站設計、品牌網站制作、手機網站建設、ChatGPT
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯