Android RxJava異步數(shù)據(jù)處理庫(kù)使用詳解
觀察者模式
四大要素:Observable(被觀察者),Observer (觀察者),subscribe (訂閱),事件。
觀察者訂閱被觀察者,一旦被觀察者發(fā)出事件,觀察者就可以接收到。
擴(kuò)展的觀察者模式
當(dāng)事件完成時(shí)會(huì)回調(diào)onComplete(),在完成過(guò)程中發(fā)生了異常會(huì)回調(diào)onError(),onError()和onComplete()只會(huì)回調(diào)一個(gè)。
引入依賴
implementation 'io.reactivex.rxjava3:rxjava:3.1.3'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
//創(chuàng)建被觀察者 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello Uncle Xing"); emitter.onComplete(); } }); //創(chuàng)建觀察者 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i(tag, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i(tag, "onNext:" + s); } @Override public void onError(@NonNull Throwable e) { Log.i(tag, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.i(tag, "onComplete"); } }; //訂閱事件 observable.subscribe(observer);
操作符
創(chuàng)建Observable
create:用于創(chuàng)建Observable
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello Uncle Xing"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i(tag, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i(tag, "onNext:" + s); } @Override public void onError(@NonNull Throwable e) { Log.i(tag, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.i(tag, "onComplete"); } });
just:創(chuàng)建一個(gè)Observable并自動(dòng)調(diào)用onNext發(fā)射數(shù)據(jù),just中傳遞的參數(shù)將直接在Observer的onNext方法中接收到
Observable.just("Uncle Xing").subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i(tag, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i(tag, "onNext:" + s); } @Override public void onError(@NonNull Throwable e) { Log.i(tag, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.i(tag, "onComplete"); } });
interval:創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable,可用作定時(shí)器。
Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Long aLong) { Log.i(tag, "count:" + aLong); //這里是非主線程,會(huì)隔1s打印出0,1,2,3.... } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
timer:創(chuàng)建一個(gè)Observable,它在一個(gè)特定延遲后發(fā)射一個(gè)值
Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.i(tag, "count:" + aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
轉(zhuǎn)換Observable
map:對(duì)數(shù)據(jù)進(jìn)行變換后,可以返回任意值,對(duì)數(shù)據(jù)的變換是1對(duì)1進(jìn)行的。
Observable.just(666).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Throwable { return integer.toString(); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { Log.i(tag, "map:" + s); } });
flatMap:對(duì)數(shù)據(jù)變換后,返回ObservableSource對(duì)象,可以對(duì)數(shù)據(jù)進(jìn)行一對(duì)多,多對(duì)多的變換。
Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Throwable { return Observable.just(integer.toString()); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { Log.i(tag, "accept:" + s); } });
buffer:把Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個(gè)值
Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Throwable { Log.i(tag, integers.toString()); } });
Log會(huì)分兩次打印,第一次打印 [1, 2, 3],第二次打印 [4, 5, 6]
過(guò)濾Observable
distinct:去掉重復(fù)數(shù)據(jù)
Observable.just(1, 2, 3, 4, 2, 3).distinct().subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log.i(tag, "distinct:" + integer); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
elementAt:取出指定位置的數(shù)據(jù)
Observable.just(1, 2, 3, 4).elementAt(1).subscribe(new MaybeObserver<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onSuccess(@NonNull Integer integer) { Log.i(tag, "onSuccess:" + integer); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
filter:對(duì)數(shù)據(jù)進(jìn)行指定規(guī)則的過(guò)濾
Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Throwable { return integer > 1; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.i(tag, "filter:" + integer); } });
組合Observable
zip:通過(guò)一個(gè)函數(shù)將多個(gè)Observable的發(fā)射物結(jié)合到一起,基于這個(gè)函數(shù)的結(jié)果為每個(gè)結(jié)合體發(fā)射單個(gè)數(shù)據(jù)項(xiàng)
Observable<Integer> observable = Observable.just(10, 20, 30, 40); Observable<Integer> observable2 = Observable.just(1, 2, 3); Observable.zip(observable, observable2, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Throwable { return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.i(tag, "zip:" + integer); } });
注意:當(dāng)其中一個(gè)Observable發(fā)送數(shù)據(jù)結(jié)束或異常,另外一個(gè)也停止發(fā)送,所以這里只會(huì)打印出11,22,33
merge:合并多個(gè)Observable的發(fā)射物
Observable<Integer> observable = Observable.just(10, 20, 30, 40); Observable<Integer> observable2 = Observable.just(1, 2, 3); Observable.merge(observable, observable2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.i(tag, "merge:" + integer);//會(huì)打印出10,20,30,1,2,3 } });
錯(cuò)誤處理
- onErrorReturn:讓Observable遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特殊的項(xiàng)并且正常終止
- onErrorResumeNext:讓Observable在遇到錯(cuò)誤時(shí)開(kāi)始發(fā)射第二個(gè)Observable的數(shù)據(jù)序列
Schedulers調(diào)度器-解決多線程問(wèn)題
- io():用于I/O操作;
- computation():計(jì)算工作默認(rèn)的調(diào)度器;
- immediate():立即執(zhí)行,允許立即在當(dāng)前線程執(zhí)行你指定的工作;
- newThread():創(chuàng)建新線程;
- trampoline():順序處理,按需處理隊(duì)列,并運(yùn)行隊(duì)列的每一個(gè)任務(wù)。
AndroidSchedulers:RxAndroid提供在Android平臺(tái)的調(diào)度器,指定觀察者在主線程。
SubscribeOn用于每個(gè)Observable對(duì)象,ObserveOn用于每個(gè)Observer對(duì)象
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { emitter.onNext(100); emitter.onComplete(); Log.i(tag, "subscribe thread:" + Thread.currentThread().getName());//打印subscribe thread:RxNewThreadScheduler-1 } }).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log.i(tag, "onNext thread:" + Thread.currentThread().getName());//打印onNext thread:main } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
管理RxJava的生命周期
在使用RxJava的時(shí)候,如果沒(méi)有及時(shí)解除訂閱,在退出Activity的時(shí)候,異步線程還在執(zhí)行,對(duì)Activity的引用還在,此時(shí)就會(huì)產(chǎn)生內(nèi)存泄露問(wèn)題。
可使用RxLifecycle,傳送門
引入依賴
implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
讓你的Activity繼承RxAppCompatActivity,F(xiàn)ragment繼承RxFragment,其余類似,然后使用bindUntilEvent或者bindToLifecycle
Observable.interval(1000, TimeUnit.MILLISECONDS) .compose(bindUntilEvent(ActivityEvent.DESTROY)) //當(dāng)前Activity執(zhí)行到onDestroy時(shí),Observable取消訂閱 .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Throwable { Log.i(tag, "accept:" + aLong); } });
Observable.interval(1000, TimeUnit.MILLISECONDS) .compose(bindToLifecycle()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Throwable { Log.i(tag, "accept:" + aLong); } });
使用bindToLifecycle:
如果Observable在onCreate執(zhí)行,那么當(dāng)執(zhí)行到onDestroy時(shí)取消訂閱。
如果Observable在onStart執(zhí)行,那么當(dāng)執(zhí)行到onStop時(shí)取消訂閱。
如果Observable在onResume執(zhí)行,那么當(dāng)執(zhí)行到onPause時(shí)取消訂閱。
RxJava與Retrofit完成網(wǎng)絡(luò)請(qǐng)求
public interface MyService { @GET("gallery/{imageType}/response") Observable<List<String>> getImages(@Path("imageType") String imageType); }
Retrofit retrofit = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .baseUrl(BASE_URL) .build(); MyService service = retrofit.create(MyService.class); service.getImages("banner") .compose(bindToLifecycle()) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Throwable { //todo } });
到此這篇關(guān)于Android RxJava異步數(shù)據(jù)處理庫(kù)使用詳解的文章就介紹到這了,更多相關(guān)Android RxJava異步數(shù)據(jù)處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Kotlin + Retrofit + RxJava簡(jiǎn)單封裝使用詳解
- Android使用Kotlin和RxJava 2.×實(shí)現(xiàn)短信驗(yàn)證碼倒計(jì)時(shí)效果
- Kotlin結(jié)合Rxjava+Retrofit實(shí)現(xiàn)極簡(jiǎn)網(wǎng)絡(luò)請(qǐng)求的方法
- RxJava中map和flatMap的用法區(qū)別源碼解析
- Rxjava+Retrofit+Okhttp進(jìn)行網(wǎng)絡(luò)訪問(wèn)及數(shù)據(jù)解析
- Kotlin下Rxjava的基礎(chǔ)用法及流式調(diào)用示例詳解
相關(guān)文章
Android實(shí)現(xiàn)抽獎(jiǎng)轉(zhuǎn)盤實(shí)例代碼
這篇文章主要介紹了Android實(shí)現(xiàn)抽獎(jiǎng)轉(zhuǎn)盤實(shí)例代碼,可以應(yīng)用于Android游戲開(kāi)發(fā)中的一個(gè)應(yīng)用,需要的朋友可以參考下2014-07-07ViewPager滑動(dòng)靈敏度調(diào)整的方法實(shí)力
這篇文章主要介紹了ViewPager滑動(dòng)靈敏度調(diào)整的方法實(shí)力,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-10-10Android開(kāi)發(fā)之利用Activity實(shí)現(xiàn)Dialog對(duì)話框
這篇文章主要給大家介紹了Android開(kāi)發(fā)之如何利用Activity實(shí)現(xiàn)Dialog對(duì)話框效果,文中給出了詳細(xì)的示例代碼,相信對(duì)大家的理解及學(xué)習(xí)具有一定的參考借鑒價(jià)值,有需要的朋友們下面來(lái)一起看看吧。2016-12-12Android自定義processor實(shí)現(xiàn)bindView功能的實(shí)例
下面小編就為大家分享一篇Android自定義processor實(shí)現(xiàn)bindView功能的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2017-12-12Android開(kāi)發(fā)ListView中下拉刷新上拉加載及帶列的橫向滾動(dòng)實(shí)現(xiàn)方法
這篇文章主要介紹了Android開(kāi)發(fā)ListView中下拉刷新上拉加載及帶列的橫向滾動(dòng)實(shí)現(xiàn)方法的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-07-07Android 嵌套Fragment的使用實(shí)例代碼
本文主要介紹Android Fragment,在這里提供了實(shí)例代碼跟效果圖,希望能幫助有需要的小伙伴2016-07-07Android實(shí)現(xiàn)圖片選擇上傳功能實(shí)例
這篇文章主要介紹了Android實(shí)現(xiàn)圖片選擇以及圖片上傳的功能,有需要的朋友跟著學(xué)習(xí)下吧。2017-12-12kotlin 官方學(xué)習(xí)教程之基礎(chǔ)語(yǔ)法詳解
這篇文章主要介紹了kotlin 官方學(xué)習(xí)教程之基礎(chǔ)語(yǔ)法詳解的相關(guān)資料,需要的朋友可以參考下2017-05-05老生常談Listview中onItemClick中的各個(gè)參數(shù)(推薦)
下面小編就為大家?guī)?lái)一篇老生常談Listview中onItemClick中的各個(gè)參數(shù)(推薦)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-04-04