Kotlin協(xié)程flowOn與線程切換超詳細示例介紹
示例代碼
本文分析示例代碼如下:
launch(Dispatchers.Main) { flow { emit(1) emit(2) }.flowOn(Dispatchers.IO).collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d("liduo", "$it") } }
一.flowOn方法
flowOn方法用于將上游的流切換到指定協(xié)程上下文的調度器中執(zhí)行,同時不會把協(xié)程上下文暴露給下游的流,即flowOn方法中協(xié)程上下文的調度器不會對下游的流生效。如下面這段代碼所示:
launch(Dispatchers.Main) { flow { emit(2) // 執(zhí)行在IO線程池 }.flowOn(Dispatchers.IO).map { it + 1 // 執(zhí)行在Default線程池 }.flowOn(Dispatchers.Default).collect { Log.d("liduo", "$it") //執(zhí)行在主線程 } }
接下來,分析一下flowOn方法,代碼如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { // 檢查當前協(xié)程沒有執(zhí)行結束 checkFlowContext(context) return when { // 為空,則返回自身 context == EmptyCoroutineContext -> this // 如果是可融合的Flow,則嘗試融合操作,獲取新的流 this is FusibleFlow -> fuse(context = context) // 其他情況,包裝成可融合的Flow else -> ChannelFlowOperatorImpl(this, context = context) } } // 確保Job不為空 private fun checkFlowContext(context: CoroutineContext) { require(context[Job] == null) { "Flow context cannot contain job in it. Had $context" } }
在flowOn方法中,首先會檢查方法所在的協(xié)程是否執(zhí)行結束。如果沒有結束,則會執(zhí)行判斷語句,這里flowOn方法傳入的上下文不是空上下文,且通過flow方法構建出的Flow對象也不是FusibleFlow類型的對象,因此這里會走到else分支,將上游flow方法創(chuàng)建的Flow對象和上下文包裝成ChannelFlowOperatorImpl類型的對象。
1.ChannelFlowOperatorImpl類
ChannelFlowOperatorImpl類繼承自ChannelFlowOperator類,用于將上游的流包裝成一個ChannelFlow對象,它的繼承關系如下圖所示:
通過上圖可以知道,ChannelFlowOperatorImpl類最終繼承了ChannelFlow類,代碼如下:
internal class ChannelFlowOperatorImpl<T>( flow: Flow<T>, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) { // 用于流融合時創(chuàng)建新的流 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) // 若當前的流不需要通過Channel即可實現(xiàn)正常工作時,會調用此方法 override fun dropChannelOperators(): Flow<T>? = flow // 觸發(fā)對下一級流進行收集 override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector) }
二.collect方法
在Kotlin協(xié)程:Flow基礎原理中講到,當執(zhí)行collect方法時,內部會調用最后產生的Flow對象的collect方法,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> override suspend fun emit(value: T) = action(value) })public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
這個最后產生的Flow對象就是ChannelFlowOperatorImpl類對象。
1.ChannelFlowOperator類的collect方法
ChannelFlowOperatorImpl類沒有重寫collect方法,因此調用的是它的父類ChannelFlowOperator類的collect方法,代碼如下:
override suspend fun collect(collector: FlowCollector<T>) { // OPTIONAL_CHANNEL為默認值,這里滿足條件,之后會詳細講解 if (capacity == Channel.OPTIONAL_CHANNEL) { // 獲取當前協(xié)程的上下文 val collectContext = coroutineContext // 計算新的上下文 val newContext = collectContext + context // 如果前后上下文沒有發(fā)生變化 if (newContext == collectContext) // 直接觸發(fā)對下一級流的收集 return flowCollect(collector) // 如果上下文發(fā)生變化,但不需要切換線程 if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor]) // 切換協(xié)程上下文,調用flowCollect方法觸發(fā)下一級流的收集 return collectWithContextUndispatched(collector, newContext) } // 調用父類的collect方法 super.collect(collector) } // 獲取當前協(xié)程的上下文,該方法會被編譯器處理 @SinceKotlin("1.3") @Suppress("WRONG_MODIFIER_TARGET") @InlineOnly public suspend inline val coroutineContext: CoroutineContext get() { throw NotImplementedError("Implemented as intrinsic") }
ChannelFlowOperator類的collect方法在設計上與協(xié)程的withContext方法設計思路是一致的:在方法內根據上下文的不同情況進行判斷,在必要時才會切換線程去執(zhí)行任務。
通過flowOn方法創(chuàng)建的ChannelFlowOperatorImpl類對象,參數(shù)capacity為默認值OPTIONAL_CHANNEL。因此代碼在執(zhí)行時會進入到判斷中,但因為我們指定了上下文為Dispatchers.IO,因此上下文發(fā)生了變化,同時攔截器也發(fā)生了變化,所以最后會調用ChannelFlowOperator類的父類的collect方法,也就是ChannelFlow類的collect方法。
2.ChannelFlow類的collect方法
ChannelFlow類的代碼如下:
override suspend fun collect(collector: FlowCollector<T>): Unit = coroutineScope { collector.emitAll(produceImpl(this)) }
在ChannelFlow類的collect方法中,首先通過coroutineScope方法創(chuàng)建了一個作用域協(xié)程,接著調用了produceImpl方法,代碼如下:
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
produceImpl方法內部調用了produce方法,并且傳入了待執(zhí)行的任務collectToFun。
produce方法在Kotlin協(xié)程:協(xié)程的基礎與使用中曾提到過,它是官方提供的啟動協(xié)程的四個方法之一,另外三個方法為launch方法、async方法、actor方法。代碼如下:
internal fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E> { // 根據容量與溢出策略創(chuàng)建Channel對象 val channel = Channel<E>(capacity, onBufferOverflow) // 計算新的上下文 val newContext = newCoroutineContext(context) // 創(chuàng)建協(xié)程 val coroutine = ProducerCoroutine(newContext, channel) // 監(jiān)聽完成事件 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion) // 啟動協(xié)程 coroutine.start(start, coroutine, block) return coroutine }
在produce方法內部,首先創(chuàng)建了一個Channel類型的對象,接著創(chuàng)建了類型為ProducerCoroutine的協(xié)程,并且傳入Channel對象作為參數(shù)。最后,produce方法返回了一個ReceiveChannel接口指向的對象,當協(xié)程執(zhí)行完畢后,會通過Channel對象將結果通過send方法發(fā)送出來。
至此,可以知道flowOn方法的實現(xiàn)實際上是利用了協(xié)程攔截器的攔截功能。
在這里之后,代碼邏輯分成了兩部分,一部分是block在ProducerCoroutine協(xié)程中的執(zhí)行,另一部分是通過ReceiveChannel對象獲取執(zhí)行的結果。
3.flow方法中代碼的執(zhí)行
在produceImpl方法中,調用了produce方法,并且傳入了collectToFun對象,這個對象將會在produce方法創(chuàng)建的協(xié)程中執(zhí)行,代碼如下:
internal val collectToFun: suspend (ProducerScope<T>) -> Unit get() = { collectTo(it) }
當調用collectToFun對象的invoke方法時,會觸發(fā)collectTo方法的執(zhí)行,該方法在ChannelFlowOperator類中被重寫,代碼如下:
protected override suspend fun collectTo(scope: ProducerScope<T>) = flowCollect(SendingCollector(scope))
在collectTo方法中,首先將參數(shù)scope封裝成SendingCollector類型的對象,接著調用了flowCollect方法,該方法在ChannelFlowOperatorImpl類中被重寫,代碼如下:
override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector)
ChannelFlowOperatorImpl類的flowCollect方法內部調用了flow對象的collect方法,這個flow對象就是最初通過flow方法構建的對象。根據Kotlin協(xié)程:Flow基礎原理的分析,這個flow對象類型為SafeFlow,最后會通過collectSafely方法,觸發(fā)flow方法中的block執(zhí)行。代碼如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { // 觸發(fā)執(zhí)行 collector.block() } }
當flow方法在執(zhí)行過程中需要向下游發(fā)出值時,會調用emit方法。根據上面flowCollect方法和collectTo方法可以知道,collectSafely方法的collector對象就是collectTo方法中創(chuàng)建的SendingCollector類型的對象,代碼如下:
@InternalCoroutinesApi public class SendingCollector<T>( private val channel: SendChannel<T> ) : FlowCollector<T> { // 通過Channel類對象發(fā)送值 override suspend fun emit(value: T): Unit = channel.send(value) }
當調用SendingCollector類型的對象的emit方法時,會通過調用類型為Channel的對象的send方法,將值發(fā)送出去。
接下來,將分析下游如何接收上游發(fā)出的值。
4.接收flow方法發(fā)出的值
回到ChannelFlow類的collect方法,之前提到collect方法中調用produceImpl方法,開啟了一個新的協(xié)程去執(zhí)行任務,并且返回了一個ReceiveChannel接口指向的對象。代碼如下:
override suspend fun collect(collector: FlowCollector<T>): Unit = coroutineScope { collector.emitAll(produceImpl(this)) }
在調用完produceImpl方法后,接著調用了emitAll方法,將ReceiveChannel接口指向的對象作為emitAll方法的參數(shù),代碼如下:
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit = emitAllImpl(channel, consume = true)
emitAll方法是FlowCollector接口的擴展方法,內部調用了emitAllImpl方法對參數(shù)channel進行封裝,代碼如下:
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) { // 用于保存異常 var cause: Throwable? = null try { // 死循環(huán) while (true) { // 掛起,等待接收Channel結果或Channel關閉 val result = run { channel.receiveOrClosed() } // 如果Channel關閉了 if (result.isClosed) { // 如果有異常,則拋出 result.closeCause?.let { throw it } // 沒有異常,則跳出循環(huán) break } // 獲取并發(fā)送值 emit(result.value) } } catch (e: Throwable) { // 捕獲到異常時拋出 cause = e throw e } finally { // 執(zhí)行結束關閉Channel if (consume) channel.cancelConsumed(cause) } }
emitAllImpl方法是FlowCollector接口的擴展方法,而這里的FlowCollector接口指向的對象,就是collect方法中創(chuàng)建的匿名對象,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
在emitAllImpl方法中,當通過receiveOrClosed方法獲取到上游發(fā)出的值時,會調用emit方法通知下游,這時就會觸發(fā)collect方法中block的執(zhí)行,最終實現(xiàn)值從流的上游傳遞到了下游。
三.flowOn方法與流的融合
假設對一個流連續(xù)調用兩次flowOn方法,那么流最終會在哪個flowOn方法指定的調度器中執(zhí)行呢?代碼如下:
launch(Dispatchers.Main) { flow { emit(2) // emit方法是在IO線程執(zhí)行還是在主線程執(zhí)行呢? }.flowOn(Dispatchers.IO).flowOn(Dispatchers.Main).collect { Log.d("liduo", "$it") } }
答案是在IO線程執(zhí)行,為什么呢?
根據本篇上面的分析,當?shù)谝淮握{用flowOn方法時,上游的流會被包裹成ChannelFlowOperatorImpl對象,代碼如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { // 檢查當前協(xié)程沒有執(zhí)行結束 checkFlowContext(context) return when { // 為空,則返回自身 context == EmptyCoroutineContext -> this // 如果是可融合的Flow,則嘗試融合操作,獲取新的流 this is FusibleFlow -> fuse(context = context) // 其他情況,包裝成可融合的Flow else -> ChannelFlowOperatorImpl(this, context = context) } }
而當?shù)诙握{用flowOn方法時,由于此時上游的流——ChannelFlowOperatorImpl類型的對象,實現(xiàn)了FusibleFlow接口,因此,這里會觸發(fā)流的融合,直接調用上游的流的fuse方法,并傳入新的上下文。這里容量和溢出策略均為默認值。
根據Kotlin協(xié)程:Flow的融合、Channel容量、溢出策略的分析,這里會調用ChannelFlow類的fuse方法。相關代碼如下:
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> { ... // 計算融合后流的上下文 // context為下游的上下文,this.context為上游的上下文 val newContext = context + this.context ... }
再根據之前在Kotlin協(xié)程:協(xié)程上下文與上下文元素中的分析,當兩個上下文進行相加時,后一個上下文中的攔截器會覆蓋前一個上下文中的攔截器。在上面的代碼中,后一個上下文為上游的流的上下文,因此會優(yōu)先使用上游的攔截器。代碼如下:
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
四.總結
粉線為使用時代碼編寫順序,綠線為下游觸發(fā)上游的調用順序,紅線為上游向下游發(fā)送值的調用順序,藍線為線程切換的位置。
到此這篇關于Kotlin協(xié)程flowOn與線程切換超詳細示例介紹的文章就介紹到這了,更多相關Kotlin flowOn與線程切換內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
讓Android中RadioGroup不顯示在輸入法上面的辦法
在Android開發(fā)中,發(fā)現(xiàn)一個問題,打開輸入法導致下面的radioGroup的位置發(fā)生了變化,被頂?shù)搅溯斎敕ǖ纳厦?,那么該如何解決呢?下面來看看。2016-08-08Android使用Intent.ACTION_SEND分享圖片和文字內容的示例代碼
這篇文章主要介紹了Android使用Intent.ACTION_SEND分享圖片和文字內容的示例代碼的實例代碼,具有很好的參考價值,希望對大家有所幫助,一起跟隨小編過來看看吧2018-05-05