Kotlin下Rxjava的基礎用法及流式調用示例詳解
前言
萬事開頭難,寫文章也是,現(xiàn)在越來越不知道開頭怎么寫了,所以在前言中,簡單介紹下RxJava吧,第一次聽說還是以前做Android開發(fā)的時候,那時候好多庫中都使用了Rxjava,而在網(wǎng)絡請求中,也有很多都是使用Rxjava去寫,但自己卻沒怎么在項目中寫過,而在搜索資料中發(fā)現(xiàn),微信中搜rxjava時,最多介紹他的還是Android開發(fā)者,所以今天來記錄下。
而所謂的響應式編程,就是一種用于應用程序異步編程的技術,他是一個通用的思想,類似與AOP,不只是在java中才有。他專注于對數(shù)據(jù)的變化做出反應,例如,有一個數(shù)據(jù)源(這里被稱為生產(chǎn)者),一個數(shù)據(jù)目標(這里被成為消費者),然后在將消費者連接到訂閱者之后,響應式編程框架負責將生產(chǎn)者生產(chǎn)的數(shù)據(jù)推送給消費者,一個可觀察對象可以有任意數(shù)量的訂閱者。
而對于一些思想上的框架,類似于Spring,源碼上大體還是比較難的,畢竟就算是人,在思想上跨越也是有難度的,但對于RxJava來說,源碼也不是很多,所以在以后會嘗試介紹他的源碼實現(xiàn),而使用Rxjava的好處不是在于實現(xiàn)了什么具體的技術功能,比如使用CGLIB可以實現(xiàn)動態(tài)代理的技術,使用JDBC可以進行數(shù)據(jù)查詢,而沒有rxjava,我們的代碼還可以借助Java8的Stream、CompletableFuture來實現(xiàn)。
而rxjava的好處在于讓代碼更簡潔、優(yōu)雅,通過他的鏈式調用,消除嵌套等。
在下面的例子中,我們會使用Kotlin來做示范。
基礎用法
在這里,Observable 字面意思是可觀察者,他表示數(shù)據(jù)源,通常,一旦訂閱者開始收聽,他們就會開始提供數(shù)據(jù),而just表示僅僅,僅僅生產(chǎn)的數(shù)據(jù)是一個"T",即泛型類型,在這里是String。
而subscribe表示訂閱,當訂閱后,他會收到Observable生產(chǎn)的數(shù)據(jù),來消費。
fun main() { Observable.just("hello rxjava").subscribe { println(it) } } 輸出: hello rxjava
fromXXX
而上面說到,just表示僅僅,在rxjava中,不僅僅是具體的數(shù)據(jù),還可以是Callable、Array、Future對象等,詳細可以看fromXXX等方法,最終的結果由rxjava調用后如Callable的結果后,傳遞給訂閱者。
fun main() { Observable.fromCallable { println("callable") "hello rxjava" }.subscribe { println(it) } }
create
這個方法給我了我們手動執(zhí)行的能力,即傳遞數(shù)據(jù)到訂閱者是我們手動執(zhí)行的。
fun main() { Observable.create<String> { it.onNext("hello") it.onError(IllegalArgumentException("錯誤")) it.onComplete() }.subscribe ({ println(it) },{ println(it.message) },{ println("完成") }) }
interval & timer
還可以通過interval實現(xiàn)固定間隔定時。
fun main() { val observable = Observable.interval(1, TimeUnit.SECONDS) observable.subscribe { println(it) } observable.subscribe { println(it) Thread.sleep(2000) } Thread.sleep(100000); }
而timer方法則是延遲N時間后,發(fā)送數(shù)據(jù)到訂閱者.
fun main() { val observable = Observable.timer(2, TimeUnit.SECONDS) observable.subscribe { println(it) } observable.subscribe { println(it) Thread.sleep(2000) } Thread.sleep(100000); }
指定線程
而使用上面方法有一個好處,即生產(chǎn)者可以在子線程中完成,而實際消費的時候在主線程,這在Android可謂是一種福利,如下。
fun main() { val threadPool = Executors.newCachedThreadPool() val anyFuture = threadPool.submit(Callable { Thread.sleep(2000) "hello" }) Observable.fromFuture(anyFuture).subscribe { println(it) } }
而如果擔心等待時間問題,可是使用第二個重載方法,指定一個超時時間,而subscribe還有兩個主要參數(shù)我們沒說,一個是error發(fā)生錯誤時回調,一個是complete完成時回調,但在發(fā)生錯誤后,complete是不會回調的。
fun main() { val threadPool = Executors.newCachedThreadPool() val anyFuture = threadPool.submit(Callable { Thread.sleep(2000) "hello" }) Observable.fromFuture(anyFuture,1,TimeUnit.SECONDS).subscribe({ println(it) },{ println("錯誤") },{ println("完成") }) }
observeOn & subscribeOn
但你以為這就結束了嗎,不,rxjava提供了豐富的線程切換,observeOn & subscribeOn這兩個方法就是用來指定在哪里運行,Schedulers.newThread()
表示在新線程,但rxjava實現(xiàn)的線程中,是守護線程,也就是當主線程退出后,他們也會自動退出,而在下面的例子中,如果在最后不加sleep,會導致主線程退出后,rxjava的所有線程在可能沒執(zhí)行完成后也將退出。
fun main() { Observable.create<String> { println(Thread.currentThread().isDaemon) it.onNext("hello") } .observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribe { println(Thread.currentThread().name) println(it) } Thread.sleep(10000) }
而如果想自定義線程,也是支持的。
fun createSchedulers(): Scheduler { return Schedulers.from { thread { it.run() } } } fun main() { Observable.create<String> { it.onNext("hello") } .observeOn(createSchedulers()) .subscribeOn(Schedulers.newThread()) .subscribe { println(Thread.currentThread().name) println(it) } }
Flowable
Flowable可以看成Observable新的實現(xiàn),他支持背壓,而他的API和Observable相似,在最后會介紹背壓。
流式調用
我們已經(jīng)熟悉了Java Stream的好處,所以在這里簡單看下rxjava的實現(xiàn),用法都一樣,如下,創(chuàng)建集合"a","b","c","d"
。
- map將所有item前添加字符"1"。
- filter將b結尾的數(shù)據(jù)過濾掉。
- skip忽略前n個數(shù)據(jù)。
fun main() { Flowable.fromIterable(mutableListOf("a","b","c","d")) .map { "1${it}" } .filter { !it.endsWith("b") } .skip(1) .subscribe { println(it) } }
所以最后收到的消息將是 1c、1d
。
當然他提供的這類API非常之多,就不介紹了。
背壓
背壓指的是遇到被觀察者發(fā)送的消息太快,至于它的訂閱者不能及時處理數(shù)據(jù),而我們可以提供一種告訴被觀察者遇到這種情況的策略。
這種場景有個前提條件,被觀察者和訂閱者在不同線程。
背壓策略被定義在BackpressureStrategy,有五種。
MISSING
通過create方法創(chuàng)建的Flowable沒有指定背壓策略,不會對通過OnNext發(fā)送的數(shù)據(jù)做緩存或丟棄,需要下游通過背壓操作符制定策略。
ERROR
如果緩存池數(shù)據(jù)超限,則拋出異常。
BUFFER
可以無限制添加數(shù)據(jù)。
DROP
如果緩存池滿了,則丟棄。
LATEST
僅保留最新的onNext值,如果下游無法跟上,則覆蓋之前的值。
如下,我們使用BUFFER策略,默認的緩存池大小是128,可以通過System.setProperty("rx3.buffer-size","5")
指定,而這個策略會導致只有緩存池不滿的情況下,才會生產(chǎn)數(shù)據(jù)并發(fā)送給訂閱者。
fun main() { System.setProperty("rx3.buffer-size","5") Observable.interval(1,TimeUnit.MILLISECONDS) .toFlowable(BackpressureStrategy.BUFFER) .map { User(1) } .observeOn(Schedulers.newThread()) .subscribe { Thread.sleep(1000) println("hander $it") } Thread.sleep(100000) }
而如果我們改成DROP,那么最終只有5條數(shù)據(jù)被消費,其他全部丟棄。
fun main() { System.setProperty("rx3.buffer-size","5") Observable.range(1,999) .toFlowable(BackpressureStrategy.DROP) .map { User(1) } .observeOn(Schedulers.newThread()) .subscribe { Thread.sleep(1000) println("hander $it") } Thread.sleep(100000) }
其他就不做demo了。
以上就是Kotlin下Rxjava的基礎用法及流式調用示例詳解的詳細內容,更多關于Kotlin Rxjava的資料請關注腳本之家其它相關文章!
相關文章
Android 高德地圖之poi搜索功能的實現(xiàn)代碼
這篇文章主要介紹了android 高德地圖之poi搜索功能的實現(xiàn)代碼,在實現(xiàn)此功能時遇到很多問題,在文章都給大家提到,需要的朋友可以參考下2017-08-08EditText限制輸入數(shù)字,精確到小數(shù)點后1位的設置方法
下面小編就為大家?guī)硪黄狤ditText限制輸入數(shù)字,精確到小數(shù)點后1位的設置方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-04-04