Kotlin協(xié)程低級(jí)api startCoroutine與ContinuationInterceptor
聊一聊kotlin協(xié)程“低級(jí)”api
Kotlin協(xié)程已經(jīng)出來(lái)很久了,相信大家都有不同程度的用上了,由于最近處理的需求有遇到協(xié)程相關(guān),因此今天來(lái)聊一Kotlin協(xié)程的“低級(jí)”api,首先低級(jí)api并不是它真的很“低級(jí)”,而是kotlin協(xié)程庫(kù)中的基礎(chǔ)api,我們一般開(kāi)發(fā)用的,其實(shí)都是通過(guò)低級(jí)api進(jìn)行封裝的高級(jí)函數(shù),本章會(huì)通過(guò)低級(jí)api的組合,實(shí)現(xiàn)一個(gè)自定義的async await 函數(shù)(下文也會(huì)介紹kotlin 高級(jí)api的async await),涉及的低級(jí)api有startCoroutine ,ContinuationInterceptor 等
startCoroutine
我們知道,一個(gè)suspend關(guān)鍵字修飾的函數(shù),只能在協(xié)程體中執(zhí)行,伴隨著suspend 關(guān)鍵字,kotlin coroutine common庫(kù)(平臺(tái)無(wú)關(guān))也提供出來(lái)一個(gè)api,用于直接通過(guò)suspend 修飾的函數(shù)直接啟動(dòng)一個(gè)協(xié)程,它就是startCoroutine
@SinceKotlin("1.3") @Suppress("UNCHECKED_CAST") public fun <R, T> (suspend R.() -> T).startCoroutine( 作為Receiver receiver: R, 當(dāng)前協(xié)程結(jié)束時(shí)的回調(diào) completion: Continuation<T> ) { createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit) }
可以看到,它的Receiver是(suspend R.() -> T),即是一個(gè)suspend修飾的函數(shù),那么這個(gè)有什么作用呢?我們知道,在普通函數(shù)中無(wú)法調(diào)起suspend函數(shù)(因?yàn)槠胀ê瘮?shù)沒(méi)有隱含的Continuation對(duì)象,這里我們不在這章講,可以參考kotlin協(xié)程的資料)
但是普通函數(shù)是可以調(diào)起一個(gè)以suspend函數(shù)作為Receiver的函數(shù)(本質(zhì)也是一個(gè)普通函數(shù))
其中startCoroutine就是其中一個(gè),本質(zhì)就是我們直接從外部提供了一個(gè)Continuation,同時(shí)調(diào)用了resume方法,去進(jìn)入到了協(xié)程的世界
startCoroutine實(shí)現(xiàn) createCoroutineUnintercepted(completion).intercepted().resume(Unit)
這個(gè)原理我們就不細(xì)講下去原理,之前也有寫過(guò)相關(guān)的文章。通過(guò)這種調(diào)用,我們其實(shí)就可以實(shí)現(xiàn)在普通的函數(shù)環(huán)境,開(kāi)啟一個(gè)協(xié)程環(huán)境(即帶有了Continuation),進(jìn)而調(diào)用其他的suspend函數(shù)。
ContinuationInterceptor
我們都知道攔截器的概念,那么kotlin協(xié)程也有,就是ContinuationInterceptor,它提供以AOP的方式,讓外部在resume(協(xié)程恢復(fù))前后進(jìn)行自定義的攔截操作,比如高級(jí)api中的Diapatcher就是。當(dāng)然什么是resume協(xié)程恢復(fù)呢,可能讀者有點(diǎn)懵,我們還是以上圖中出現(xiàn)的mySuspendFunc舉例子
mySuspendFunc是一個(gè)suspned函數(shù) ::mySuspendFunc.startCoroutine(object : Continuation<Unit> { override val context: CoroutineContext get() = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { } })
它其實(shí)等價(jià)于
val continuation = ::mySuspendFunc.createCoroutine(object :Continuation<Unit>{ override val context: CoroutineContext get() = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { Log.e("hello","當(dāng)前協(xié)程執(zhí)行完成的回調(diào)") } }) continuation.resume(Unit)
startCoroutine方法就相當(dāng)于創(chuàng)建了一個(gè)Continuation對(duì)象,并調(diào)用了resume。創(chuàng)建Continuation可通過(guò)createCoroutine方法,返回一個(gè)Continuation,如果我們不調(diào)用resume方法,那么它其實(shí)什么也不會(huì)執(zhí)行,只有調(diào)用了resume等執(zhí)行方法之后,才會(huì)執(zhí)行到后續(xù)的協(xié)程體(這個(gè)也是協(xié)程內(nèi)部實(shí)現(xiàn),感興趣可以看看之前文章)
而我們的攔截器,就相當(dāng)于在continuation.resume前后,可以添加自己的邏輯。我們可以通過(guò)繼承ContinuationInterceptor,實(shí)現(xiàn)自己的攔截器邏輯,其中需要復(fù)寫的方法是interceptContinuation方法,用于返回一個(gè)自己定義的Continuation對(duì)象,而我們可以在這個(gè)Continuation的resumeWith方法里面(當(dāng)調(diào)用了resume之后,會(huì)執(zhí)行到resumeWith方法),進(jìn)行前后打印/其他自定義操作(比如切換線程)
class ClassInterceptor() :ContinuationInterceptor { override val key = ContinuationInterceptor override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation) } class MyContinuation<T>(private val continuation: Continuation<T>):Continuation<T> by continuation{ override fun resumeWith(result: Result<T>) { Log.e("hello","MyContinuation start ${result.getOrThrow()}") continuation.resumeWith(result) Log.e("hello","MyContinuation end ") } }
其中的key是ContinuationInterceptor,協(xié)程內(nèi)部會(huì)在每次協(xié)程恢復(fù)的時(shí)候,通過(guò)coroutineContext取出key為ContinuationInterceptor的攔截器,進(jìn)行攔截調(diào)用,當(dāng)然這也是kotlin協(xié)程內(nèi)部實(shí)現(xiàn),這里簡(jiǎn)單提一下。
實(shí)戰(zhàn)
kotlin協(xié)程api中的 async await
我們來(lái)看一下kotlin Coroutine 的高級(jí)api async await用法
CoroutineScope(Dispatchers.Main).launch { val block = async(Dispatchers.IO) { // 阻塞的事項(xiàng) } // 處理其他主線程的事務(wù) // 此時(shí)必須需要async的結(jié)果時(shí),則可通過(guò)await()進(jìn)行獲取 val result = block.await() }
我們可以通過(guò)async方法,在其他線程中處理其他阻塞事務(wù),當(dāng)主線程必須要用async的結(jié)果的時(shí)候,就可以通過(guò)await等待,這里如果結(jié)果返回了,則直接獲取值,否則就等待async執(zhí)行完成。這是Coroutine提供給我們的高級(jí)api,能夠?qū)⑷蝿?wù)簡(jiǎn)單分層而不需要過(guò)多的回調(diào)處理。
通過(guò)startCoroutine與ContinuationInterceptor實(shí)現(xiàn)自定義的 async await
我們可以參考其他語(yǔ)言的async,或者Dart的異步方法調(diào)用,都有類似這種方式進(jìn)行線程調(diào)用
async { val result = await { suspend 函數(shù) } 消費(fèi)result }
await在async作用域里面,同時(shí)獲取到result后再進(jìn)行消費(fèi),async可以直接在普通函數(shù)調(diào)用,而不需要在協(xié)程體內(nèi),下面我們來(lái)實(shí)現(xiàn)一下這個(gè)做法。
首先我們想要限定await函數(shù)只能在async的作用域才能使用,那么首先我們就要定義出來(lái)一個(gè)Receiver,我們可以在Receiver里面定義出自己想要暴露的方法
interface AsyncScope { fun myFunc(){ } } fun async( context: CoroutineContext = EmptyCoroutineContext, block: suspend AsyncScope.() -> Unit ) { // 這個(gè)有兩個(gè)作用 1.充當(dāng)receiver 2.completion,接收回調(diào) val completion = AsyncStub(context) block.startCoroutine(completion, completion) } 注意這個(gè)類,resumeWith 只會(huì)跟startCoroutine的這個(gè)協(xié)程綁定關(guān)系,跟await的協(xié)程沒(méi)有關(guān)系 class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation<Unit>, AsyncScope { override fun resumeWith(result: Result<Unit>) { // 這個(gè)是干嘛的 == > 完成的回調(diào) Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}") } }
上面我們定義出來(lái)一個(gè)async函數(shù),同時(shí)定義出來(lái)了一個(gè)AsyncStub的類,它有兩個(gè)用處,第一個(gè)是為了充當(dāng)Receiver,用于規(guī)范后續(xù)的await函數(shù)只能在這個(gè)Receiver作用域中調(diào)用,第二個(gè)作用是startCoroutine函數(shù)必須要傳入一個(gè)參數(shù)completion,是為了收到當(dāng)前協(xié)程結(jié)束的回調(diào)resumeWith中可以得到當(dāng)前協(xié)程體結(jié)束回調(diào)的信息
await方法里面 suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> { // 自定義的Receiver函數(shù) myFunc() Thread{ 切換線程執(zhí)行await中的方法 it.resumeWith(Result.success(block())) }.start() }
在await中,其實(shí)是一個(gè)擴(kuò)展函數(shù),我們可以調(diào)用任何在AsyncScope中定義的方法,同時(shí)這里我們模擬了一下線程切換的操作(Dispatcher的實(shí)現(xiàn),這里不采用Dispatcher就是想讓大家知道其實(shí)Dispatcher.IO也是這樣實(shí)現(xiàn)的),在子線程中調(diào)用it.resumeWith(Result.success(block())),用于返回所需要的信息
通過(guò)上面定的方法,我們可以實(shí)現(xiàn)
async { val result = await { suspend 函數(shù) } 消費(fèi)result }
public interface ContinuationInterceptor : CoroutineContext.Element //而CoroutineContext.Element又是繼承于CoroutineContext CoroutineContext.Element:CoroutineContext
而我們的攔截器,正是CoroutineContext的子類,我們把上文的ClassInterceptor修改一下
class ClassInterceptor() : ContinuationInterceptor { override val key = ContinuationInterceptor override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = MyContinuation(continuation) } class MyContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation { private val handler = Handler(Looper.getMainLooper()) override fun resumeWith(result: Result<T>) { Log.e("hello", "MyContinuation start ${result.getOrThrow()}") handler.post { continuation.resumeWith(Result.success(自定義內(nèi)容)) } Log.e("hello", "MyContinuation end ") } }
同時(shí)把a(bǔ)sync默認(rèn)參數(shù)CoroutineContext實(shí)現(xiàn)一下即可
fun async( context: CoroutineContext = ClassInterceptor(), block: suspend AsyncScope.() -> Unit ) { // 這個(gè)有兩個(gè)作用 1.充當(dāng)receiver 2.completion,接收回調(diào) val completion = AsyncStub(context) block.startCoroutine(completion, completion) }
此后我們就可以直接通過(guò),完美實(shí)現(xiàn)了一個(gè)類js協(xié)程的調(diào)用,同時(shí)具備了自動(dòng)切換線程的能力
async { val result = await { test() } Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}") }
結(jié)果
E start
E MyContinuation start kotlin.Unit
E MyContinuation end
E end
E 執(zhí)行阻塞函數(shù) test 1923
E MyContinuation start 自定義內(nèi)容數(shù)值
E MyContinuation end
E result is 自定義內(nèi)容的數(shù)值 true
E AsyncStub resumeWith 2 kotlin.Unit
最后,這里需要注意的是,為什么攔截器回調(diào)了兩次,因?yàn)槲覀僡sync的時(shí)候開(kāi)啟了一個(gè)協(xié)程,同時(shí)await的時(shí)候也開(kāi)啟了一個(gè),因此是兩個(gè)。AsyncStub只回調(diào)了一次,是因?yàn)锳syncStub被當(dāng)作complete參數(shù)傳入了async開(kāi)啟的協(xié)程block.startCoroutine,因此只是async中的協(xié)程結(jié)束才會(huì)被回調(diào)。
本章代碼
class ClassInterceptor() : ContinuationInterceptor { override val key = ContinuationInterceptor override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = MyContinuation(continuation) } class MyContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation { private val handler = Handler(Looper.getMainLooper()) override fun resumeWith(result: Result<T>) { Log.e("hello", "MyContinuation start ${result.getOrThrow()}") handler.post { continuation.resumeWith(Result.success(6 as T)) } Log.e("hello", "MyContinuation end ") } }
interface AsyncScope { fun myFunc(){ } } fun async( context: CoroutineContext = ClassInterceptor(), block: suspend AsyncScope.() -> Unit ) { // 這個(gè)有兩個(gè)作用 1.充當(dāng)receiver 2.completion,接收回調(diào) val completion = AsyncStub(context) block.startCoroutine(completion, completion) } class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation<Unit>, AsyncScope { override fun resumeWith(result: Result<Unit>) { // 這個(gè)是干嘛的 == > 完成的回調(diào) Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}") } } suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> { myFunc() Thread{ it.resumeWith(Result.success(block())) }.start() }
模擬阻塞
fun test(): Int { Thread.sleep(5000) Log.e("hello", "執(zhí)行阻塞函數(shù) test ${Thread.currentThread().id}") return 5 }
async { val result = await { test() } Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}") }
最后
我們通過(guò)協(xié)程的低級(jí)api,實(shí)現(xiàn)了一個(gè)與官方庫(kù)不同版本的async await,同時(shí)也希望通過(guò)對(duì)低級(jí)api的設(shè)計(jì),也能對(duì)Coroutine官方庫(kù)的高級(jí)api的實(shí)現(xiàn)有一定的了解。
以上就是Kotlin協(xié)程低級(jí)api startCoroutine與ContinuationInterceptor 的詳細(xì)內(nèi)容,更多關(guān)于Kotlin協(xié)程低級(jí)api的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Android RecyclerView仿新聞?lì)^條的頻道管理功能
這篇文章主要介紹了Android RecyclerView仿新聞?lì)^條的頻道管理功能,需要的朋友可以參考下2017-06-06Android 中使用RadioGroup和Fragment實(shí)現(xiàn)底部導(dǎo)航欄的功能
這篇文章主要介紹了Android 中使用RadioGroup+Fragment實(shí)現(xiàn)底部導(dǎo)航欄的功能,整體文章大概分為兩部分介紹,通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-06-06Android 個(gè)人理財(cái)工具六:顯示賬單明細(xì) 下
本文主要節(jié)誒是Android 個(gè)人理財(cái)工具顯示賬單明細(xì),主要實(shí)現(xiàn)此窗口的查詢和刪除功能,這里提供實(shí)現(xiàn)代碼,有興趣的小伙伴可以參考下2016-08-08詳解Android平臺(tái)上讀寫NFC標(biāo)簽
NFC,即Near Field Communication,近距離無(wú)線通訊技術(shù),是一種短距離的(通常<=4cm或更短)高頻(13.56M Hz)無(wú)線通信技術(shù),可以讓消費(fèi)者簡(jiǎn)單直觀地交換信息、訪問(wèn)內(nèi)容與服務(wù)。2017-01-01Android 擴(kuò)大 View 的點(diǎn)擊區(qū)域的方法
這篇文章主要介紹了Android 擴(kuò)大 View 的點(diǎn)擊區(qū)域的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-04-04