Kotlin線程的橋接與切換使用介紹
一.線程的橋接
1.runBlocking方法
runBlocking方法用于在線程中去執(zhí)行suspend方法,代碼如下:
@Throws(InterruptedException::class) public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { // 通知編譯器,block只執(zhí)行一次 contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } // 獲取當(dāng)前線程 val currentThread = Thread.currentThread() // 獲取上下文中的攔截器 val contextInterceptor = context[ContinuationInterceptor] val eventLoop: EventLoop? val newContext: CoroutineContext // 如果攔截器為空,代表無法進(jìn)行調(diào)度 if (contextInterceptor == null) { // 從線程中獲取EventLoop,獲取失敗則創(chuàng)建一個(gè)新的 eventLoop = ThreadLocalEventLoop.eventLoop // 添加到上下文中 // newContext = EmptyCoroutineContext + context + eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else {// 如果有攔截器 // 嘗試將當(dāng)前攔截器轉(zhuǎn)換成EventLoop, // 如果轉(zhuǎn)換成功,則判斷是否允許可以在上下文中使用 // 如果轉(zhuǎn)換失敗或不允許,則創(chuàng)建一個(gè)新的 eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } ?: ThreadLocalEventLoop.currentOrNull() // 計(jì)算新的上下文 // 這里沒有把EventLoop加到上下文,因?yàn)榧尤牒髸?huì)覆蓋攔截器 newContext = GlobalScope.newCoroutineContext(context) } // 創(chuàng)建一個(gè)協(xié)程 val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) // 啟動(dòng)協(xié)程 coroutine.start(CoroutineStart.DEFAULT, coroutine, block) // 分發(fā)任務(wù) return coroutine.joinBlocking() }
2.BlockingCoroutine類
在runBlocking方法中,最終創(chuàng)建了一個(gè)類型為BlockingCoroutine的對(duì)象。BlockingCoroutine類繼承自AbstractCoroutine類,代碼如下:
// 繼承了AbstractCoroutine private class BlockingCoroutine<T>( parentContext: CoroutineContext, private val blockedThread: Thread, private val eventLoop: EventLoop? ) : AbstractCoroutine<T>(parentContext, true) { // 該協(xié)程是一個(gè)作用域協(xié)程 override val isScopedCoroutine: Boolean get() = true override fun afterCompletion(state: Any?) { // 如果當(dāng)前線程不是阻塞線程 if (Thread.currentThread() != blockedThread) // 喚醒阻塞線程 LockSupport.unpark(blockedThread) } @Suppress("UNCHECKED_CAST") fun joinBlocking(): T { registerTimeLoopThread() try { // 注冊(cè)使用EventLoop eventLoop?.incrementUseCount() try { // 死循環(huán) while (true) { @Suppress("DEPRECATION") // 如果線程當(dāng)前中斷,則拋出異常,同時(shí)取消當(dāng)前協(xié)程 if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } // 分發(fā)執(zhí)行任務(wù),同時(shí)獲取等待時(shí)間 val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // 如果任務(wù)執(zhí)行結(jié)束,則退出循環(huán) if (isCompleted) break // 休眠指定的等待時(shí)間 parkNanos(this, parkNanos) } } finally { // paranoia // 注冊(cè)不使用EventLoop eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() } // 獲取執(zhí)行的結(jié)果 val state = this.state.unboxState() // 如果執(zhí)行過程中取消,則拋出異常 (state as? CompletedExceptionally)?.let { throw it.cause } // 返回結(jié)果 return state as T } }
BlockingCoroutine類重寫了變量isScopedCoroutine為true。
isScopedCoroutine表示當(dāng)前協(xié)程是否為作用域協(xié)程,該變量用在cancelParent方法中。對(duì)于一個(gè)作用域協(xié)程,當(dāng)它的子協(xié)程在運(yùn)行過程中拋出異常時(shí),子協(xié)程調(diào)用cancelParent方法不會(huì)導(dǎo)致作用域協(xié)程取消,而是直接返回true。當(dāng)子協(xié)程執(zhí)行完畢,作用域協(xié)程獲取結(jié)果時(shí),如果發(fā)現(xiàn)子協(xié)程返回的結(jié)果為異常,則會(huì)再次拋出。
相比于一般協(xié)程,作用域協(xié)程不相信子協(xié)程在執(zhí)行過程中取消通知,而是在執(zhí)行完畢后親自檢查結(jié)果是否為異常,達(dá)到一種“耳聽為虛,眼見為實(shí)”的效果。
joinBlocking方法通過循環(huán)在當(dāng)前線程上對(duì)EventLoop進(jìn)行任務(wù)分發(fā)來實(shí)現(xiàn)線程的阻塞。當(dāng)任務(wù)發(fā)生異?;驁?zhí)行完畢后,會(huì)回調(diào)重寫的afterCompletion方法,喚起線程繼續(xù)循環(huán),當(dāng)在循環(huán)中檢測到isCompleted標(biāo)志位為true時(shí),會(huì)跳出循環(huán),恢復(fù)線程執(zhí)行。
二.線程的切換
1.withContext方法
withContext方法用于在協(xié)程中切換線程去執(zhí)行其他任務(wù),該方法被suspend關(guān)鍵字修飾,因此會(huì)引起協(xié)程的掛起,代碼如下:
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T { // 通知編譯器,block只執(zhí)行一次 contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } // 直接掛起,獲取續(xù)體 return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> // 從續(xù)體中獲取上下文 val oldContext = uCont.context // 計(jì)算新的上下文 val newContext = oldContext + context // 檢查任務(wù)是否執(zhí)行完畢或取消 newContext.checkCompletion() // 如果前后兩次的上下文完全相同,說明不需要切換,只需要執(zhí)行即可 if (newContext === oldContext) { // 創(chuàng)建續(xù)體的協(xié)程 val coroutine = ScopeCoroutine(newContext, uCont) // 執(zhí)行block return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } // 攔截器相同,但是上下文中增加了其他的元素 // 這里也是在同一個(gè)線程上執(zhí)行,但是其中增加的元素只在執(zhí)行當(dāng)前的block中使用 if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { // 創(chuàng)建續(xù)體的協(xié)程 val coroutine = UndispatchedCoroutine(newContext, uCont) // 將當(dāng)前線程ThreadLocal中的對(duì)象更新成newContext上下文對(duì)應(yīng)的對(duì)象 withCoroutineContext(newContext, null) { // 執(zhí)行block return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } } // 走到這里,說明要切換線程執(zhí)行block任務(wù) val coroutine = DispatchedCoroutine(newContext, uCont) // 啟動(dòng)父協(xié)程 coroutine.initParentJob() // 啟動(dòng)協(xié)程 block.startCoroutineCancellable(coroutine, coroutine) // 獲取結(jié)果 coroutine.getResult() } }
通過對(duì)上面代碼的分析,可以發(fā)現(xiàn)withContext根據(jù)上下文的不同進(jìn)行了三種分類,創(chuàng)建不同的協(xié)程并通過不同的方式去執(zhí)行block。如下表所示:
協(xié)程上下文變化 | 協(xié)程類型 | 啟動(dòng)方式 |
---|---|---|
完全相同 | ScopeCoroutine | startUndispatchedOrReturn |
攔截器相同 | UndispatchedCoroutine | startUndispatchedOrReturn |
攔截器不同 | DispatchedCoroutine | startCoroutineCancellable |
接下來,將對(duì)不同情況下協(xié)程的啟動(dòng)與執(zhí)行進(jìn)行分析。
2.startUndispatchedOrReturn方法
startUndispatchedOrReturn方法用于在相同的上下文環(huán)境中啟動(dòng)協(xié)程,代碼如下:
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? { // 初始化并綁定父協(xié)程 initParentJob() // 獲取并處理執(zhí)行結(jié)果 return undispatchedResult({ true }) { // 啟動(dòng)協(xié)程 block.startCoroutineUninterceptedOrReturn(receiver, this) } } private inline fun <T> ScopeCoroutine<T>.undispatchedResult( shouldThrow: (Throwable) -> Boolean, startBlock: () -> Any? ): Any? { // 啟動(dòng)協(xié)程,獲取結(jié)果, val result = try { startBlock() } catch (e: Throwable) { // 產(chǎn)生異常,則按照取消處理 CompletedExceptionally(e) } // 如果結(jié)果為掛起,則通知外部掛起 if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // 結(jié)束任務(wù)執(zhí)行,獲取最終狀態(tài) val state = makeCompletingOnce(result) // 如果需要等待子協(xié)程的結(jié)束,則通知外部掛起 if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // 如果執(zhí)最終為異常狀態(tài) return if (state is CompletedExceptionally) { when { // 通過參數(shù)判斷是否拋出 shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont) // 執(zhí)行結(jié)果為異常 result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont) // 結(jié)果不為異常,則返回 else -> result } } else { // 對(duì)最終狀態(tài)進(jìn)行拆箱,返回最終結(jié)果 state.unboxState() } } // JobSupport中提供了下面的類和方法,當(dāng)協(xié)程進(jìn)入完成狀態(tài)時(shí),會(huì)對(duì)狀態(tài)進(jìn)行裝箱。 // 包裝類 private class IncompleteStateBox(@JvmField val state: Incomplete) // 裝箱 internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this // 拆箱 internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
在startUndispatchedOrReturn方法中,通過調(diào)用block的startCoroutineUninterceptedOrReturn方法啟動(dòng)協(xié)程,獲取最終結(jié)果,并對(duì)結(jié)果進(jìn)行異常處理。
接下來,將分析startCoroutineUninterceptedOrReturn方法如何啟動(dòng)協(xié)程,代碼如下:
@SinceKotlin("1.3") @InlineOnly public actual inline fun <R, T> (suspend R.() -> T).startCoroutineUninterceptedOrReturn( receiver: R, completion: Continuation<T> ): Any? = (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)
這里,直接找到最終的actual方法,可以發(fā)現(xiàn)該方法沒有創(chuàng)建狀態(tài)機(jī),而是直接執(zhí)行了block。這個(gè)方法被設(shè)計(jì)用在suspendCoroutineUninterceptedOrReturn方法中,來恢復(fù)掛起協(xié)程的執(zhí)行。
至此,可以知道startUndispatchedOrReturn方法實(shí)際上就是在同一個(gè)協(xié)程中執(zhí)行了block。
3.ScopeCoroutine類
在withContext方法中,當(dāng)上下文相同時(shí),會(huì)創(chuàng)建一個(gè)類型為ScopeCoroutine的對(duì)象。ScopeCoroutine類代表一個(gè)標(biāo)準(zhǔn)的作用域協(xié)程,代碼如下:
internal open class ScopeCoroutine<in T>( context: CoroutineContext, @JvmField val uCont: Continuation<T> ) : AbstractCoroutine<T>(context, true), CoroutineStackFrame { final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame? final override fun getStackTraceElement(): StackTraceElement? = null // 作用域協(xié)程 final override val isScopedCoroutine: Boolean get() = true internal val parent: Job? get() = parentContext[Job] // 該方法會(huì)在協(xié)程異常或取消時(shí)調(diào)用 override fun afterCompletion(state: Any?) { // 進(jìn)行攔截,切換線程,恢復(fù)執(zhí)行 uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } // 該方法會(huì)在將其掛起的方法執(zhí)行完畢后回調(diào) override fun afterResume(state: Any?) { // 直接恢復(fù)續(xù)體的執(zhí)行 uCont.resumeWith(recoverResult(state, uCont)) } }
ScopeCoroutine類重寫了afterCompletion和afterResume兩個(gè)方法,afterCompletion方法用于在協(xié)程取消時(shí)被回調(diào)。afterResume方法用于在掛起恢復(fù)時(shí)被回調(diào)。
根據(jù)上面的分析,當(dāng)發(fā)生異常時(shí),afterCompletion方法可能在其他的協(xié)程上下文中被調(diào)用,因此會(huì)調(diào)用攔截器切換回原本的線程中。而afterResume方法由于已經(jīng)在正確的上下文環(huán)境中,因此可以直接恢復(fù)執(zhí)行。
4.UndispatchedCoroutine類
在withContext方法中,當(dāng)上下文不同,但調(diào)度器相同時(shí),會(huì)創(chuàng)建一個(gè)類型為UndispatchedCoroutine的對(duì)象。UndispatchedCoroutine類繼承自ScopeCoroutine類,重寫了afterResume方法,代碼如下:
private class UndispatchedCoroutine<in T>( context: CoroutineContext, uCont: Continuation<T> ) : ScopeCoroutine<T>(context, uCont) { override fun afterResume(state: Any?) { val result = recoverResult(state, uCont) // 將當(dāng)前線程ThreadLocal中的對(duì)象更新成uCont.context上下文對(duì)應(yīng)的對(duì)象 withCoroutineContext(uCont.context, null) { // 恢復(fù)執(zhí)行 uCont.resumeWith(result) } } }
與父類ScopeCoroutine的afterResume方法相比,UndispatchedCoroutine類在afterResume方法中對(duì)協(xié)程上下文進(jìn)行了更新,然后再恢復(fù)執(zhí)行。
- withCoroutineContext
withCoroutineContext方法用于當(dāng)一個(gè)線程中執(zhí)行多個(gè)協(xié)程時(shí),保存和恢復(fù)ThreadLocal類中的對(duì)象。
通過withContext方法的代碼可以知道,當(dāng)上下文不同但調(diào)度器相同時(shí),在執(zhí)行之前會(huì)通過withCoroutineContext方法將ThreadLocal中的對(duì)象更新成newContext對(duì)應(yīng)的對(duì)象。在執(zhí)行結(jié)束后,又將ThradLocal中的對(duì)象更新成原本續(xù)體的上下文context對(duì)應(yīng)的對(duì)象。代碼如下:
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T { // 將線程上下文更新新的上下文,并返回老的上下文 val oldValue = updateThreadContext(context, countOrElement) try { // 在新的上下文環(huán)境中執(zhí)行 return block() } finally { // 執(zhí)行結(jié)束恢復(fù)老的上下文 restoreThreadContext(context, oldValue) } }
協(xié)程中有一類上下文元素是ThreadContextElement,ThreadContextElement是一個(gè)接口,具體的實(shí)現(xiàn)類有CoroutineId類和ThreadLocalElement類。其中,CoroutineId類用來修改線程的名字。ThreadLocalElement類用來保存和恢復(fù)ThreadLocal類中的對(duì)象,withCoroutineContext方法內(nèi)部的updateThreadContext方法與restoreThreadContext方法正是通過ThreadLocalElement類實(shí)現(xiàn)的。ThreadContextElement接口的代碼如下:
public interface ThreadContextElement<S> : CoroutineContext.Element { // 用于更新新的上下文,并且返回老的上下文 public fun updateThreadContext(context: CoroutineContext): S // 重新恢復(fù)當(dāng)前線程的上下文, // 其中oldStart來自u(píng)pdateThreadContext方法的返回值 public fun restoreThreadContext(context: CoroutineContext, oldState: S) }
當(dāng)調(diào)用updateThreadContext方法時(shí),會(huì)返回一個(gè)代表當(dāng)前狀態(tài)的對(duì)象。當(dāng)調(diào)用restoreThreadContext方法時(shí),又需要傳入一個(gè)代表狀態(tài)的對(duì)象作為參數(shù),來恢復(fù)之前的狀態(tài)。因此,這就需要對(duì)updateThreadContext方法的返回值進(jìn)行保存。
當(dāng)協(xié)程上下文中只有一個(gè)ThreadContextElement接口指向的對(duì)象時(shí),保存在變量中即可。而如果協(xié)程上下文中有多個(gè)ThreadContextElement接口指向的對(duì)象,這時(shí)就需要一個(gè)專門的類來對(duì)這些對(duì)象進(jìn)行管理,這個(gè)類就是ThreadState類,他們之間的對(duì)應(yīng)關(guān)系如下圖所示:
withCoroutineContext方法執(zhí)行圖:
5.DispatchedCoroutine類
在withContext方法中,當(dāng)需要切換線程時(shí),會(huì)創(chuàng)建一個(gè)類型為DispatchedCoroutine的對(duì)象。DispatchedCoroutine類繼承自ScopeCoroutine類,代碼如下:
// 狀態(tài)機(jī)狀態(tài) private const val UNDECIDED = 0 private const val SUSPENDED = 1 private const val RESUMED = 2 private class DispatchedCoroutine<in T>( context: CoroutineContext, uCont: Continuation<T> ) : ScopeCoroutine<T>(context, uCont) { // 初始狀態(tài) private val _decision = atomic(UNDECIDED) // 嘗試掛起 private fun trySuspend(): Boolean { _decision.loop { decision -> when (decision) { UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true RESUMED -> return false else -> error("Already suspended") } } } // 嘗試恢復(fù) private fun tryResume(): Boolean { _decision.loop { decision -> when (decision) { UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true SUSPENDED -> return false else -> error("Already resumed") } } } override fun afterCompletion(state: Any?) { // 通過afterResume方法實(shí)現(xiàn) afterResume(state) } override fun afterResume(state: Any?) { // 如果沒有掛起,則返回 if (tryResume()) return // 進(jìn)行攔截,切換線程,恢復(fù)執(zhí)行 uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } // 獲取最終結(jié)果 fun getResult(): Any? { if (trySuspend()) return COROUTINE_SUSPENDED val state = this.state.unboxState() if (state is CompletedExceptionally) throw state.cause @Suppress("UNCHECKED_CAST") return state as T } }
DispatchedCoroutine類中使用了一個(gè)狀態(tài)機(jī)模型,這個(gè)狀態(tài)機(jī)與在Kotlin協(xié)程:生命周期原理中分析CancellableContinuationImpl類中的狀態(tài)機(jī)相同,獲取結(jié)果的邏輯也與CancellableContinuationImpl類相同。
這里最重要的是DispatchedCoroutine類重寫了afterCompletion和afterResume方法,并且回調(diào)這兩個(gè)方法都會(huì)進(jìn)行線程的切換。
6.總結(jié)
ScopeCoroutine類 | UndispatchedCoroutine類 | DispatchedCoroutine類 | |
---|---|---|---|
afterCompletion方法 | 切線程 | 切線程 | 切線程 |
afterResume方法 | 不切線程 | 不切線程。更新ThreadLocal | 切線程 |
以上就是Kotlin線程的橋接與切換使用介紹的詳細(xì)內(nèi)容,更多關(guān)于Kotlin線程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Android源碼系列之深入理解ImageView的ScaleType屬性
Android源碼系列第一篇,這篇文章主要從源碼的角度深入理解ImageView的ScaleType屬性,感興趣的小伙伴們可以參考一下2016-06-06Android開發(fā)獲取傳感器數(shù)據(jù)的方法示例【加速度傳感器,磁場傳感器,光線傳感器,方向傳感器】
這篇文章主要介紹了Android開發(fā)獲取傳感器數(shù)據(jù)的方法,結(jié)合實(shí)例形式分析了Android獲取加速度傳感器、磁場傳感器、光線傳感器及方向傳感器數(shù)據(jù)的相關(guān)操作技巧,需要的朋友可以參考下2017-11-11android實(shí)現(xiàn)上滑屏幕隱藏底部菜單欄的示例
這篇文章主要介紹了android實(shí)現(xiàn)上滑屏幕隱藏底部菜單欄的示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-02-02Android實(shí)現(xiàn)畫板、寫字板功能(附源碼下載)
這篇文章主要介紹了Android實(shí)現(xiàn)畫板、寫字板功能的方法,文中給出了簡單的介紹和示例代碼,想要了解更多的朋友可以下載源碼進(jìn)行學(xué)習(xí),感興趣的朋友們下面來一起看看吧。2017-01-01Android UI設(shè)計(jì)與開發(fā)之ViewPager仿微信引導(dǎo)界面以及動(dòng)畫效果
這篇文章主要為大家詳細(xì)介紹了Android UI設(shè)計(jì)與開發(fā)之ViewPager仿微信引導(dǎo)界面以及動(dòng)畫效果,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08Android 使用AsyncTask實(shí)現(xiàn)多線程斷點(diǎn)續(xù)傳
本文將詳細(xì)講解如何使用AsyncTask來實(shí)現(xiàn)多線程的斷點(diǎn)續(xù)傳下載功能,感興趣的朋友跟隨腳本之家小編一起學(xué)習(xí)吧2018-05-05