Kotlin協(xié)程開發(fā)之Flow的融合與Channel容量及溢出策略介紹
一.協(xié)程間的通信
當(dāng)需要進(jìn)行協(xié)程間的通信時(shí),可以調(diào)用Channel方法,創(chuàng)建一個(gè)Channel接口指向的對(duì)象,通過調(diào)用該對(duì)象的send方法和receive方法實(shí)現(xiàn)消息的發(fā)送與接收。協(xié)程對(duì)Channel接口的實(shí)現(xiàn),本質(zhì)上與阻塞隊(duì)列類似,這里不再贅述。
1.通道容量
事實(shí)上,send方法與receive方法并沒有定義在Channel接口中,而是分別定義在SendChannel接口和ReceiveChannel接口中。Channel接口中只是定義了一些與Channel容量策略相關(guān)的枚舉常量,代碼如下:
// 繼承SendChannel接口和ReceiveChannel接口 public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> { // 枚舉常量 public companion object Factory { // Channel的容量為無限 public const val UNLIMITED: Int = Int.MAX_VALUE // Channel的容量為0,沒有緩存 public const val RENDEZVOUS: Int = 0 // Channel的容量為1,溢出策略為DROP_OLDEST, // 后一個(gè)的數(shù)據(jù)會(huì)覆蓋前一個(gè)數(shù)據(jù) public const val CONFLATED: Int = -1 // Channel的容量為默認(rèn)值CHANNEL_DEFAULT_CAPACITY, // 默認(rèn)溢出策略為SUSPEND,send方法會(huì)發(fā)生掛起 // 當(dāng)容量策略為BUFFERED,而溢出策略不為SUSPEND時(shí),Channel的容量為1 public const val BUFFERED: Int = -2 // 協(xié)程內(nèi)部使用的一個(gè)默認(rèn)枚舉值,不對(duì)外暴露 internal const val OPTIONAL_CHANNEL = -3 // 用于手動(dòng)配置容量策略為BUFFERED時(shí)的默認(rèn)值 public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer" // 容量策略為BUFFERED時(shí)的默認(rèn)值 // 默認(rèn)64,最小1,最大為Int.MAX_VALUE-1 internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME, 64, 1, UNLIMITED - 1 ) } }
從上面的代碼可以看出Channel接口繼承自SendChannel接口和ReceiveChannel接口。因此,一個(gè)Channel接口指向的對(duì)象,既可以用于發(fā)送消息,也可以用于接收消息。
2.溢出策略
Channel除了容量策略外,還有溢出策略,用于決定當(dāng)Channel的容量已滿時(shí),而下一個(gè)消息到來時(shí)的行為。溢出策略定義在枚舉類BufferOverflow中,代碼如下:
public enum class BufferOverflow { // 當(dāng)容量已滿時(shí),掛起調(diào)用send方法的協(xié)程 SUSPEND, // 當(dāng)容量已滿時(shí),刪除舊數(shù)據(jù),將新的數(shù)據(jù)添加進(jìn)去,不掛起調(diào)用send方法的協(xié)程 DROP_OLDEST, // 當(dāng)容量已滿時(shí),忽略當(dāng)前要添加的數(shù)據(jù),不掛起調(diào)用send方法的協(xié)程 DROP_LATEST }
二.FusibleFlow接口
FusibleFlow接口繼承自Flow接口。一個(gè)類實(shí)現(xiàn)了該接口,表示該類創(chuàng)建的流可以與其上游或下游相鄰的流進(jìn)行融合,當(dāng)流發(fā)生融合時(shí),就會(huì)調(diào)用接口中定義的fuse方法,代碼如下:
@InternalCoroutinesApi public interface FusibleFlow<T> : Flow<T> { // 用于流的融合 public fun fuse( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): Flow<T> }
FusibleFlow接口的fuse方法,默認(rèn)容量為OPTIONAL_CHANNEL,默認(rèn)溢出策略為SUSPEND。
流的融合
在Flow中,當(dāng)channelFlow方法、flowOn方法、buffer方法、produceIn方法、broadcastIn方法相鄰調(diào)用時(shí),就會(huì)觸發(fā)流的融合。
具體融合的過程,其實(shí)是將下游流的容量、溢出策略、上下文傳遞給上游的流處理,上游的流根據(jù)自身的容量、溢出策略、上下文以及下游的流的容量、溢出策略、上下文重新計(jì)算,得到新的容量、溢出策略、上下文,并返回一個(gè)融合后的流。
三.ChannelFlow類
ChannelFlow類是一個(gè)抽象類,實(shí)現(xiàn)了FusibleFlow接口。下面分析一下fuse方法對(duì)于上下游流融合的策略,代碼如下:
@InternalCoroutinesApi public abstract class ChannelFlow<T>( // 上游流的上下文 @JvmField public val context: CoroutineContext, // 上下游之間流的緩存容量 @JvmField public val capacity: Int, // 溢出策略 @JvmField public val onBufferOverflow: BufferOverflow ) : FusibleFlow<T> { ... public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> { // CONFLATED是一個(gè)復(fù)合的類型,需要拆解成capacity = 0, onBufferOverflow = DROP_OLDEST assert { capacity != Channel.CONFLATED } // 計(jì)算融合后流的上下文 val newContext = context + this.context // 用于保存融合后流的容量 val newCapacity: Int // 用于保存融合后流的溢出策略 val newOverflow: BufferOverflow // SUSPEND為默認(rèn)溢出策略,如果溢出策略不是默認(rèn)的策略 if (onBufferOverflow != BufferOverflow.SUSPEND) { // 直接保存 newCapacity = capacity newOverflow = onBufferOverflow } else { // 如果是默認(rèn)策略 // 計(jì)算并保存新的容量 newCapacity = when { // 如果之前的容量為默認(rèn)枚舉值,則使用新的 this.capacity == Channel.OPTIONAL_CHANNEL -> capacity // 如果新的容量為默認(rèn)枚舉值,則使用原來的 capacity == Channel.OPTIONAL_CHANNEL -> this.capacity // 如果原來的容量為默認(rèn)值CHANNEL_DEFAULT_CAPACITY,則使用新的 this.capacity == Channel.BUFFERED -> capacity // 如果新的容量為默認(rèn)值CHANNEL_DEFAULT_CAPACITY,則使用原來的 capacity == Channel.BUFFERED -> this.capacity // 如果不為默認(rèn)值或默認(rèn)枚舉值 else -> { // 檢查容量都是大于等于0的 assert { this.capacity >= 0 } assert { capacity >= 0 } // 將原來的容量和新的容量進(jìn)行相加 val sum = this.capacity + capacity // 如果相加后大與等于0,則容量為相加后的結(jié)果,否則為無限 if (sum >= 0) sum else Channel.UNLIMITED } } // 保存溢出策略 newOverflow = this.onBufferOverflow } // 如果融合的兩個(gè)流的上下文相同,容量相同,溢出策略也相同 if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow) // 則直接返回 return this // 有變化則根據(jù)新計(jì)算出得參數(shù),創(chuàng)建融合后的流 return create(newContext, newCapacity, newOverflow) } // 由子類進(jìn)行重寫 protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> ... }
流融合的原則
根據(jù)上面對(duì)fuse方法的分析,可以總結(jié)出fuse方法在計(jì)算容量和溢出策略時(shí)的四個(gè)原則:
1)下游優(yōu)先于上游
2)溢出策略優(yōu)先于容量
3)非默認(rèn)值優(yōu)先于默認(rèn)值
4)上下游容量都不為默認(rèn)值,則相加取和
到此這篇關(guān)于Kotlin協(xié)程開發(fā)之Flow的融合與Channel容量及溢出策略介紹的文章就介紹到這了,更多相關(guān)Kotlin Flow的融合內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Jetpack?Compose實(shí)現(xiàn)對(duì)角線滾動(dòng)效果
這篇文章主要為大家詳細(xì)介紹了如何利用Jetpack?Compose實(shí)現(xiàn)一個(gè)簡(jiǎn)單的對(duì)角線滾動(dòng)效果,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2023-02-02Android DrawerLayout帶有側(cè)滑功能的布局類(1)
這篇文章主要為大家詳細(xì)介紹了Android DrawerLayout帶有側(cè)滑功能的布局類,感興趣的小伙伴們可以參考一下2016-07-07Android:利用SharedPreferences實(shí)現(xiàn)自動(dòng)登錄
本篇文章主要介紹了Android實(shí)現(xiàn)自動(dòng)登錄,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-11-11Android程序開發(fā)之使用Design包實(shí)現(xiàn)QQ動(dòng)畫側(cè)滑效果和滑動(dòng)菜單導(dǎo)航
這篇文章主要介紹了Android程序開發(fā)之使用Design包實(shí)現(xiàn)QQ動(dòng)畫側(cè)滑效果和滑動(dòng)菜單導(dǎo)航的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-07-07關(guān)于Android的 DiskLruCache磁盤緩存機(jī)制原理
DiskLruCache是一種管理數(shù)據(jù)存儲(chǔ)的技術(shù),單從Cache的字面意思也可以理解到,"Cache","高速緩存";今天我們來從源碼上分析下DiskLruCache;關(guān)于Android LruCache的緩存機(jī)制原理,需要的朋友可以參考下面文章的具體內(nèi)容2021-09-09Android通過Path實(shí)現(xiàn)搜索按鈕和時(shí)鐘復(fù)雜效果
這篇文章主要為大家詳細(xì)介紹了Android通過Path實(shí)現(xiàn)搜索按鈕和時(shí)鐘復(fù)雜效果,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-09-09Android快速實(shí)現(xiàn)觸摸移動(dòng)的懸浮窗
這篇文章主要為大家詳細(xì)介紹了Android快速實(shí)現(xiàn)觸摸移動(dòng)的懸浮窗,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07