亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Android RxJava異步數(shù)據(jù)處理庫(kù)使用詳解

 更新時(shí)間:2022年11月02日 08:31:29   作者:幸大叔  
RxJava是一種異步數(shù)據(jù)處理庫(kù),也是一種擴(kuò)展的觀察者模式。對(duì)于Android開(kāi)發(fā)者來(lái)說(shuō),使用RxJava時(shí)也會(huì)搭配RxAndroid,它是RxJava針對(duì)Android平臺(tái)的一個(gè)擴(kuò)展,用于Android 開(kāi)發(fā),它提供了響應(yīng)式擴(kuò)展組件,使用RxAndroid的調(diào)度器可以解決Android多線程問(wèn)題

觀察者模式

四大要素:Observable(被觀察者),Observer (觀察者),subscribe (訂閱),事件。

觀察者訂閱被觀察者,一旦被觀察者發(fā)出事件,觀察者就可以接收到。

擴(kuò)展的觀察者模式

當(dāng)事件完成時(shí)會(huì)回調(diào)onComplete(),在完成過(guò)程中發(fā)生了異常會(huì)回調(diào)onError(),onError()和onComplete()只會(huì)回調(diào)一個(gè)。

引入依賴

    implementation 'io.reactivex.rxjava3:rxjava:3.1.3'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

        //創(chuàng)建被觀察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("Hello Uncle Xing");
                emitter.onComplete();
            }
        });
        //創(chuàng)建觀察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        };
        //訂閱事件
        observable.subscribe(observer);

操作符

創(chuàng)建Observable

create:用于創(chuàng)建Observable

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("Hello Uncle Xing");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        });

just:創(chuàng)建一個(gè)Observable并自動(dòng)調(diào)用onNext發(fā)射數(shù)據(jù),just中傳遞的參數(shù)將直接在Observer的onNext方法中接收到

        Observable.just("Uncle Xing").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        });

interval:創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable,可用作定時(shí)器。

        Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onNext(@NonNull Long aLong) {
                Log.i(tag, "count:" + aLong); //這里是非主線程,會(huì)隔1s打印出0,1,2,3....
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

timer:創(chuàng)建一個(gè)Observable,它在一個(gè)特定延遲后發(fā)射一個(gè)值

        Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Long aLong) {
                Log.i(tag, "count:" + aLong);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

轉(zhuǎn)換Observable

map:對(duì)數(shù)據(jù)進(jìn)行變換后,可以返回任意值,對(duì)數(shù)據(jù)的變換是1對(duì)1進(jìn)行的。

        Observable.just(666).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Throwable {
                return integer.toString();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.i(tag, "map:" + s);
            }
        });

flatMap:對(duì)數(shù)據(jù)變換后,返回ObservableSource對(duì)象,可以對(duì)數(shù)據(jù)進(jìn)行一對(duì)多,多對(duì)多的變換。

        Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Throwable {
                return Observable.just(integer.toString());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.i(tag, "accept:" + s);
            }
        });

buffer:把Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個(gè)值

        Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Throwable {
                Log.i(tag, integers.toString());
            }
        });

Log會(huì)分兩次打印,第一次打印 [1, 2, 3],第二次打印 [4, 5, 6]

過(guò)濾Observable

distinct:去掉重復(fù)數(shù)據(jù)

        Observable.just(1, 2, 3, 4, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.i(tag, "distinct:" + integer);
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

elementAt:取出指定位置的數(shù)據(jù)

        Observable.just(1, 2, 3, 4).elementAt(1).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onSuccess(@NonNull Integer integer) {
                Log.i(tag, "onSuccess:" + integer);
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

filter:對(duì)數(shù)據(jù)進(jìn)行指定規(guī)則的過(guò)濾

        Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Throwable {
                return integer > 1;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "filter:" + integer);
            }
        });

組合Observable

zip:通過(guò)一個(gè)函數(shù)將多個(gè)Observable的發(fā)射物結(jié)合到一起,基于這個(gè)函數(shù)的結(jié)果為每個(gè)結(jié)合體發(fā)射單個(gè)數(shù)據(jù)項(xiàng)

        Observable<Integer> observable = Observable.just(10, 20, 30, 40);
        Observable<Integer> observable2 = Observable.just(1, 2, 3);
        Observable.zip(observable, observable2, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "zip:" + integer);
            }
        });

注意:當(dāng)其中一個(gè)Observable發(fā)送數(shù)據(jù)結(jié)束或異常,另外一個(gè)也停止發(fā)送,所以這里只會(huì)打印出11,22,33

merge:合并多個(gè)Observable的發(fā)射物

        Observable<Integer> observable = Observable.just(10, 20, 30, 40);
        Observable<Integer> observable2 = Observable.just(1, 2, 3);
        Observable.merge(observable, observable2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "merge:" + integer);//會(huì)打印出10,20,30,1,2,3
            }
        });

錯(cuò)誤處理

  • onErrorReturn:讓Observable遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特殊的項(xiàng)并且正常終止
  • onErrorResumeNext:讓Observable在遇到錯(cuò)誤時(shí)開(kāi)始發(fā)射第二個(gè)Observable的數(shù)據(jù)序列

Schedulers調(diào)度器-解決多線程問(wèn)題

  1. io():用于I/O操作;
  2. computation():計(jì)算工作默認(rèn)的調(diào)度器;
  3. immediate():立即執(zhí)行,允許立即在當(dāng)前線程執(zhí)行你指定的工作;
  4. newThread():創(chuàng)建新線程;
  5. trampoline():順序處理,按需處理隊(duì)列,并運(yùn)行隊(duì)列的每一個(gè)任務(wù)。

AndroidSchedulers:RxAndroid提供在Android平臺(tái)的調(diào)度器,指定觀察者在主線程。

SubscribeOn用于每個(gè)Observable對(duì)象,ObserveOn用于每個(gè)Observer對(duì)象

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(100);
                emitter.onComplete();
                Log.i(tag, "subscribe thread:" + Thread.currentThread().getName());//打印subscribe thread:RxNewThreadScheduler-1
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    }
                    @Override
                    public void onNext(@NonNull Integer integer) {
                        Log.i(tag, "onNext thread:" + Thread.currentThread().getName());//打印onNext thread:main
                    }
                    @Override
                    public void onError(@NonNull Throwable e) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });

管理RxJava的生命周期

在使用RxJava的時(shí)候,如果沒(méi)有及時(shí)解除訂閱,在退出Activity的時(shí)候,異步線程還在執(zhí)行,對(duì)Activity的引用還在,此時(shí)就會(huì)產(chǎn)生內(nèi)存泄露問(wèn)題。

可使用RxLifecycle,傳送門

引入依賴

    implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
    implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'

讓你的Activity繼承RxAppCompatActivity,F(xiàn)ragment繼承RxFragment,其余類似,然后使用bindUntilEvent或者bindToLifecycle

        Observable.interval(1000, TimeUnit.MILLISECONDS)
                .compose(bindUntilEvent(ActivityEvent.DESTROY)) //當(dāng)前Activity執(zhí)行到onDestroy時(shí),Observable取消訂閱
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Throwable {
                        Log.i(tag, "accept:" + aLong);
                    }
                });
        Observable.interval(1000, TimeUnit.MILLISECONDS)
                .compose(bindToLifecycle())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Throwable {
                        Log.i(tag, "accept:" + aLong);
                    }
                });

使用bindToLifecycle:

如果Observable在onCreate執(zhí)行,那么當(dāng)執(zhí)行到onDestroy時(shí)取消訂閱。

如果Observable在onStart執(zhí)行,那么當(dāng)執(zhí)行到onStop時(shí)取消訂閱。

如果Observable在onResume執(zhí)行,那么當(dāng)執(zhí)行到onPause時(shí)取消訂閱。

RxJava與Retrofit完成網(wǎng)絡(luò)請(qǐng)求

public interface MyService {
    @GET("gallery/{imageType}/response")
    Observable<List<String>> getImages(@Path("imageType") String imageType);
}
        Retrofit retrofit = new Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(BASE_URL)
                .build();
        MyService service = retrofit.create(MyService.class);
        service.getImages("banner")
                .compose(bindToLifecycle())
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> strings) throws Throwable {
                        //todo
                    }
                });

到此這篇關(guān)于Android RxJava異步數(shù)據(jù)處理庫(kù)使用詳解的文章就介紹到這了,更多相關(guān)Android RxJava異步數(shù)據(jù)處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論