亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Kotlin協(xié)程概念原理與使用萬(wàn)字梳理

 更新時(shí)間:2022年08月01日 14:41:47   作者:LeeDuo.  
協(xié)程的作用是什么?協(xié)程是一種輕量級(jí)的線程,解決異步編程的復(fù)雜性,異步的代碼使用協(xié)程可以用順序進(jìn)行表達(dá),文中通過(guò)示例代碼介紹詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧

一.協(xié)程概述

1.概念

協(xié)程是Coroutine的中文簡(jiǎn)稱,co表示協(xié)同、協(xié)作,routine表示程序。協(xié)程可以理解為多個(gè)互相協(xié)作的程序。協(xié)程是輕量級(jí)的線程,它的輕量體現(xiàn)在啟動(dòng)和切換,協(xié)程的啟動(dòng)不需要申請(qǐng)額外的堆??臻g;協(xié)程的切換發(fā)生在用戶態(tài),而非內(nèi)核態(tài),避免了復(fù)雜的系統(tǒng)調(diào)用。

2.特點(diǎn)

1)更加輕量級(jí),占用資源更少。

2)避免“回調(diào)地獄”,增加代碼可讀性。

3)協(xié)程的掛起不阻塞線程。

3.原理

Kotlin協(xié)程原理核心體現(xiàn)在“續(xù)體傳遞”與“狀態(tài)機(jī)”兩部分。

1)續(xù)體傳遞

續(xù)體傳遞是一種代碼編寫風(fēng)格——續(xù)體傳遞風(fēng)格(Continuation-Passing-Style),簡(jiǎn)稱為CPS。續(xù)體傳遞本質(zhì)上是代碼的回調(diào)與結(jié)果的傳遞。假設(shè)將順序執(zhí)行代碼分成兩部分,第一部分執(zhí)行完成,返回一個(gè)結(jié)果(可能為空、一個(gè)對(duì)象引用、一個(gè)具體的值)。接著通過(guò)回調(diào)執(zhí)行第二部分代碼,并傳入第一部分代碼返回的結(jié)果,這種形式的代碼編寫風(fēng)格就是續(xù)體傳遞風(fēng)格。

具體地,假設(shè)要計(jì)算一個(gè)復(fù)雜的計(jì)算,正常情況會(huì)這樣編寫,代碼如下:

fun calculate(a: Int, b: Int): Int = a + b
fun main() {
    val result = calculate(1, 2)
    Log.d("liduo",result)
}

把上面的代碼改造成續(xù)體傳遞風(fēng)格。首先,定義一個(gè)續(xù)體傳遞接口,代碼如下:

interface Continuation {
    fun next(result: Int)
}

對(duì)calculate方法進(jìn)行改造,代碼如下:

fun calculate(a: Int, b: Int, continuation: Continuation) = 
    continuation.next(a + b)
fun main() {
    calculate(1, 2) { result ->
        Log.d("liduo", result)
    }
}

經(jīng)過(guò)續(xù)體傳遞改造后,打印日志的操作被封裝到了Continuation中,并且依賴計(jì)算操作的回調(diào)。如果continuation方法不回調(diào)執(zhí)行參數(shù)continuation,打印日志的操作將永遠(yuǎn)不會(huì)被執(zhí)行。

原本順序執(zhí)行一段代碼(邏輯),在經(jīng)過(guò)一次續(xù)體改造后變成了兩段代碼(邏輯)。

2)狀態(tài)機(jī)

協(xié)程的代碼在經(jīng)過(guò)Kotlin編譯器處理時(shí),會(huì)被優(yōu)化成狀態(tài)機(jī)模型。每段代碼有三個(gè)狀態(tài):未執(zhí)行、掛起、已恢復(fù)(完成)。處于未執(zhí)行狀態(tài)的代碼可以被執(zhí)行,執(zhí)行過(guò)程中發(fā)生掛起,會(huì)進(jìn)入掛起狀態(tài),從掛起中恢復(fù)或執(zhí)行完畢會(huì)進(jìn)入已恢復(fù)(完成)狀態(tài)。當(dāng)多個(gè)像這樣的代碼進(jìn)行協(xié)作時(shí),可以組合出更復(fù)雜的狀態(tài)機(jī)。

二.協(xié)程基礎(chǔ)

1.協(xié)程的上下文

協(xié)程上下文是一組可以附加到協(xié)程中的持久化用戶定義對(duì)象,代碼如下:

interface CoroutineContext {
    // 重載"[]"操作
    operator fun <E : Element> get(key: Key<E>): E?
    // 單值歸一化操作
    fun <R> fold(initial: R, operation: (R, Element) -> R): R
    // 重載 "+"操作
    operator fun plus(context: CoroutineContext): CoroutineContext
    // 獲取當(dāng)前指定key外的其他上下文
    fun minusKey(key: Key<*>): CoroutineContext
    interface Element : CoroutineContext {
        val key: Key<*>
    }
    interface Key<E : Element>
}

Element接口繼承自CoroutineContext接口,協(xié)程中的攔截器、調(diào)度器、異常處理器以及代表協(xié)程自身生命周期等重要的類,都實(shí)現(xiàn)了Element接口。

Element接口規(guī)定每個(gè)實(shí)現(xiàn)該接口的對(duì)象都要有一個(gè)獨(dú)一無(wú)二的Key,以便在需要的時(shí)候可以在協(xié)程上下文中快速的找到。因此,協(xié)程上下文可以理解為是一個(gè)Element的索引集,一個(gè)結(jié)構(gòu)介于Set和Map之間的索引集。

2.協(xié)程的作用域

協(xié)程作用域用于管理作用域內(nèi)協(xié)程的生命周期,代碼如下:

interface CoroutineScope {
    // 作用域內(nèi)啟動(dòng)協(xié)程的默認(rèn)上下文
    val coroutineContext: CoroutineContext
}

協(xié)程中提供了兩個(gè)常用的方法來(lái)創(chuàng)建新的協(xié)程作用域,一個(gè)是coroutineScope方法,一個(gè)是supervisorScope方法,這兩種方法創(chuàng)建的作用域中的上下文會(huì)自動(dòng)繼承父協(xié)程的上下文。除此之外,使用GlobalScope啟動(dòng)協(xié)程,也會(huì)為協(xié)程創(chuàng)建一個(gè)新的協(xié)程作用域,但協(xié)程作用域的上下文為空上下文。

當(dāng)父協(xié)程被取消或發(fā)生異常時(shí),會(huì)自動(dòng)取消父協(xié)程所有的子協(xié)程。當(dāng)子協(xié)程取消或發(fā)生異常時(shí),在coroutineScope作用域下,會(huì)導(dǎo)致父協(xié)程取消;而在supervisorScope作用域下,則不會(huì)影響父協(xié)程。

協(xié)程的作用域只對(duì)父子協(xié)程有效,對(duì)子孫協(xié)程無(wú)效。例如:?jiǎn)?dòng)父協(xié)程,在supervisorScope作用域內(nèi)啟動(dòng)子協(xié)程。當(dāng)子協(xié)程在啟動(dòng)孫協(xié)程時(shí),在不指定為supervisorScope作用域的情況下,默認(rèn)為coroutineScope作用域。

3.協(xié)程調(diào)度器

協(xié)程調(diào)度器用于切換執(zhí)行協(xié)程的線程。常見的調(diào)度器有以下4種:

  • Dispatchers.Default:默認(rèn)調(diào)度器。它使用JVM的共享線程池,該調(diào)度器的最大并發(fā)度是CPU的核心數(shù),默認(rèn)為2。
  • Dispatchers.Unconfined:非受限調(diào)度器。該調(diào)度器不會(huì)限制代碼在指定的線程上執(zhí)行。即掛起函數(shù)后面的代碼不會(huì)主動(dòng)恢復(fù)到掛起之前的線程去執(zhí)行,而是在執(zhí)行掛起函數(shù)的線程上執(zhí)行。
  • Dispatchers.IO:IO調(diào)度器。它將阻塞的IO任務(wù)分流到一個(gè)共享的線程池中。該調(diào)度器和Dispatchers.Default共享線程。
  • Dispatchers.Main:主線程調(diào)度器。一般用于操作與更新UI。

注意:Dispatchers.Default調(diào)度器和Dispatchers.IO 調(diào)度器分配的線程為守護(hù)線程。

4.協(xié)程的啟動(dòng)模式

協(xié)程共有以下四種啟動(dòng)模式:

  • CoroutineStart.DEFAULT:立即執(zhí)行協(xié)程,可以隨時(shí)取消。
  • CoroutineStart.LAZY:創(chuàng)建一個(gè)協(xié)程,但不執(zhí)行,在用戶需要時(shí)手動(dòng)觸發(fā)執(zhí)行。
  • CoroutineStart.ATOMIC:立即執(zhí)行協(xié)程,但在協(xié)程執(zhí)行前無(wú)法取消。目前處于試驗(yàn)階段。
  • CoroutineStart.UNDISPATCHED:立即在當(dāng)前線程執(zhí)行協(xié)程,直到遇到第一個(gè)掛起。目前處于試驗(yàn)階段。

5.協(xié)程的生命周期

每個(gè)協(xié)程在創(chuàng)建后都會(huì)返回一個(gè)Job接口指向的對(duì)象,一個(gè)Job對(duì)象代表一個(gè)協(xié)程,用于控制生命周期,代碼如下:

interface Job : CoroutineContext.Element {
    ...
    // 三個(gè)狀態(tài)標(biāo)志
    val isActive: Boolean
    val isCompleted: Boolean
    val isCancelled: Boolean
    // 獲取具體的取消異常
    fun getCancellationException(): CancellationException
    // 啟動(dòng)協(xié)程
    fun start(): Boolean
    // 取消協(xié)程
    fun cancel(cause: CancellationException? = null)
    ...
    // 等待協(xié)程執(zhí)行結(jié)束
    suspend fun join()
    // 用于select語(yǔ)句
    val onJoin: SelectClause0
    // 用于注冊(cè)協(xié)程執(zhí)行結(jié)束的回調(diào)
    fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
    ...
}

1)協(xié)程狀態(tài)的轉(zhuǎn)換

在DEFAULT、ATOMIC、UNDISPATCHED這三個(gè)模式下,啟動(dòng)協(xié)程會(huì)進(jìn)入Active狀態(tài),而在LAZY模式下啟動(dòng)的協(xié)程會(huì)進(jìn)入New狀態(tài),需要在手動(dòng)調(diào)用start方法后進(jìn)入Active狀態(tài)。

Completing是一個(gè)內(nèi)部狀態(tài),對(duì)外不可感知。

2)狀態(tài)標(biāo)識(shí)的變化

State[isActive][isCompleted][isCancelled]
Newfalsefalsefalse
Activetruefalsefalse
Completingtruefalsefalse
Cancellingfalsefalsetrue
Cancelledfalsetruetrue
Completedfasletruefalse

三.協(xié)程使用

1.協(xié)程的啟動(dòng)

1)runBlocking方法

fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

該方法用于在非協(xié)程作用域環(huán)境中啟動(dòng)一個(gè)協(xié)程,并在這個(gè)協(xié)程中執(zhí)行l(wèi)ambda表達(dá)式中的代碼。同時(shí),調(diào)用該方法會(huì)阻塞當(dāng)前線程,直到lambda表達(dá)式執(zhí)行完畢。該方法不應(yīng)該在協(xié)程中被調(diào)用,該方法設(shè)計(jì)的目的是為了讓suspend編寫的代碼可以在常規(guī)的阻塞代碼中調(diào)用。如果不設(shè)置協(xié)程調(diào)度器,那么協(xié)程將在當(dāng)前被阻塞的線程中執(zhí)行。示例代碼如下:

private fun main() {
    // 不指定調(diào)度器,在方法調(diào)用的線程執(zhí)行
    runBlocking {
        // 這里是協(xié)程的作用域
        Log.d("liduo", "123")
    }
}
private fun main() {
    // 指定調(diào)度器,在IO線程中執(zhí)行
    runBlocking(Dispatchers.IO) {
        // 這里是協(xié)程的作用域
        Log.d("liduo", "123")
    }
}

2)launch方法

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

該方法用于在協(xié)程作用域中異步啟動(dòng)一個(gè)新的協(xié)程,調(diào)用該方法不會(huì)阻塞線程。示例代碼如下:

private fun test() {
    // 作用域?yàn)镚lobalScope
    // 懶啟動(dòng),主線程執(zhí)行
    val job = GlobalScope.launch(
            context = Dispatchers.Main, 
            start = CoroutineStart.LAZY) {
        Log.d("liduo", "123")
    }
    // 啟動(dòng)協(xié)程
    job.start()
}

3)async方法

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

該方法用于在協(xié)程作用域中中異步啟動(dòng)一個(gè)新的協(xié)程,調(diào)用該方法不會(huì)阻塞線程。async方法與launch方法的不同之處在于可以攜帶返回值。調(diào)用該方法會(huì)返回一個(gè)Deferred接口指向的對(duì)象,調(diào)用該對(duì)象可以獲取協(xié)程執(zhí)行的結(jié)果。同時(shí),Deferred接口繼承自Job接口,因此仍然可以操作協(xié)程的生命周期。示例代碼如下:

// suspend標(biāo)記
private suspend fun test(): Int {
    // 作用域?yàn)镚lobalScope,返回值為Int類型,,泛型可省略,自動(dòng)推斷
    val deffer = GlobalScope.async<Int> {
        Log.d("liduo", "123")
        // 延時(shí)1s
        delay(1000)
        1
    }
    // 獲取返回值
    return deffer.await()
}

通過(guò)調(diào)用返回的Deferred接口指向?qū)ο蟮腶wait方法可以獲取返回值。在調(diào)用await方法時(shí),如果協(xié)程執(zhí)行完畢,則直接獲取返回值。如果協(xié)程還在執(zhí)行,則該方法會(huì)導(dǎo)致協(xié)程掛起,直到執(zhí)行結(jié)束或發(fā)生異常。

4)suspend關(guān)鍵字

suspend關(guān)鍵字用于修飾一個(gè)方法(lambda表達(dá)式)。suspend修飾的方法稱為suspend方法,表示方法在執(zhí)行中可能發(fā)生掛起。為什么是可能呢?比如下面的代碼雖然被suspend修飾,但實(shí)際并不會(huì)發(fā)生掛起:

private suspend fun test() {
    Log.d("liduo", "123")
}

由于會(huì)發(fā)生掛起,因此suspend方法只能在協(xié)程中使用。suspend方法內(nèi)部可以調(diào)用其他的suspend方法,也可以非suspend方法。但suspend方法只能被其他的suspend方法調(diào)用。

5)withContext方法

suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

該方法用于在當(dāng)前協(xié)程的執(zhí)行過(guò)程中,切換到調(diào)度器指定的線程去執(zhí)行參數(shù)block中的代碼,并返回一個(gè)結(jié)果。調(diào)用該方法可能會(huì)使當(dāng)前協(xié)程掛起,并在方法執(zhí)行結(jié)束時(shí)恢復(fù)掛起。示例代碼如下:

private suspend fun test() {
    // IO線程啟動(dòng)并執(zhí)行,啟動(dòng)模式DEFAULT
    GlobalScope.launch(Dispatchers.IO) {
        Log.d("liduo", "start")
        // 線程主切換并掛起,泛型可省略,自動(dòng)推斷
        val result = withContext<String>(Dispatchers.Main) {
            // 網(wǎng)絡(luò)請(qǐng)求
            "json data"
        }
        // 切換回IO線程
        Log.d("liduo", result)
    }
}

6)suspend方法

inline fun <R> suspend(noinline block: suspend () -> R): suspend () -> R = block

該方法用于對(duì)掛起方法進(jìn)行包裹,使掛起方法可以在非掛起方法中調(diào)用。該方法需要配合createCoroutine方法啟動(dòng)協(xié)程。示例代碼如下:

// 返回包含當(dāng)前的協(xié)程代碼的續(xù)體
val continuation = suspend {
    // 執(zhí)行協(xié)程代碼
    // 泛型可以修改需要的類型
}.createCoroutine(object : Continuation<Any> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext + Dispatchers.Main

    override fun resumeWith(result: Result<Any>) {
        // 獲取最終結(jié)果
    }
})
// 執(zhí)行續(xù)體內(nèi)容
continuation.resume(Unit)

一般開發(fā)中不會(huì)通過(guò)該方法啟動(dòng)協(xié)程,但該方法可以更本質(zhì)的展示協(xié)程的啟動(dòng)、恢復(fù)、掛起。

2.協(xié)程間通信

1)Channel

Channel用于協(xié)程間的通信。Channel本質(zhì)上是一個(gè)并發(fā)安全的隊(duì)列,類似BlockingQueue。在使用時(shí),通過(guò)調(diào)用同一個(gè)Channel對(duì)象的send和receive方法實(shí)現(xiàn)通信,示例代碼如下:

suspend fun main() {
    // 創(chuàng)建
    val channel = Channel<Int>()
    val producer = GlobalScope.launch {
        var i = 0
        while (true){
            // 發(fā)送
            channel.send(i++)
            delay(1000)
            // channel不需要時(shí)要及時(shí)關(guān)閉
            if(i == 10)
                channel.close()
        }
    }
    // 寫法1:常規(guī)
    val consumer = GlobalScope.launch {
        while(true){
            // 接收
            val element = channel.receive()
            Log.d("liduo", "$element")
        }
    }
    // 寫法2:迭代器
    val consumer = GlobalScope.launch {
        val iterator = channel.iterator()
        while(iterator.hasNext()){
            // 接收
            val element = iterator.next()
            Log.d("liduo", "$element")
        }
    }
    // 寫法3:增強(qiáng)for循環(huán)
    val consumer = GlobalScope.launch {
        for(element in channel){
            Log.d("liduo", "$element")
        }
    }
    // 上面的協(xié)程由于不是懶啟動(dòng),因此創(chuàng)建完成直接就會(huì)start去執(zhí)行
    // 也就是說(shuō),代碼走到這里,上面的兩個(gè)協(xié)程已經(jīng)開始工作
    // join方法會(huì)掛起當(dāng)前協(xié)程,而不是上面已經(jīng)啟動(dòng)的兩個(gè)協(xié)程
    // 在Android環(huán)境中,下面兩行代碼可以不用添加
    // producer.join()
    // consumer.join()
}

上述例子是一個(gè)經(jīng)典的生產(chǎn)者-消費(fèi)者模型。在寫法1中,由于send方法和receive方法被suspend關(guān)鍵字修飾,因此,在默認(rèn)情況下,當(dāng)生產(chǎn)速度與消費(fèi)速度不匹配時(shí),調(diào)用這兩個(gè)方法會(huì)導(dǎo)致協(xié)程掛起。

除此之外,Channel支持使用迭代器進(jìn)行接收。其中,hasNext方法也可能會(huì)導(dǎo)致協(xié)程掛起。

Channel對(duì)象在不使用時(shí)要及時(shí)關(guān)閉,可以由發(fā)送者關(guān)閉,也可以由接收者關(guān)閉,具體取決于業(yè)務(wù)場(chǎng)景。

2)Channel的容量

Channel方法不是Channel的構(gòu)造方法,而是一個(gè)工廠方法,代碼如下:

fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayChannel(capacity)
    }

在創(chuàng)建Channel時(shí)可以指定容量:

  • RENDEZVOUS:創(chuàng)建一個(gè)容量為0的Channel,類似于SynchronousQueue。send之后會(huì)掛起,直到被receive。枚舉值為0。
  • UNLIMITED:創(chuàng)建一個(gè)容量無(wú)限的Channel,內(nèi)部通過(guò)鏈表實(shí)現(xiàn)。枚舉值為Int.MAX_VALUE。
  • CONFLATED:創(chuàng)建一個(gè)容量為1的Channel,當(dāng)后一個(gè)的數(shù)據(jù)會(huì)覆蓋前一個(gè)數(shù)據(jù)。枚舉值為-1。
  • BUFFERED:創(chuàng)建一個(gè)默認(rèn)容量的Channel,默認(rèn)容量為kotlinx.coroutines.channels.defaultBuffer配置變量指定的值,未配置情況下,默認(rèn)為64。枚舉值為-2。
  • 如果capacity的值不為上述的枚舉值,則創(chuàng)建一個(gè)指定容量的Channel。

3)produce方法與actor方法

fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>

與launch方法和async方法相同,使用produce方法與actor方法也可以啟動(dòng)協(xié)程。但不同的是,在produce方法與actor方法中可以更簡(jiǎn)潔的使用Channel。示例代碼如下:

// 啟動(dòng)協(xié)程,返回一個(gè)接收Channel
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
    while(true){
        delay(100)
        // 發(fā)送
        send(1)
    }
}
// 啟動(dòng)協(xié)程,返回一個(gè)發(fā)送Channel
val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
    while(true){
        // 接收
        val element = receive()
        Log.d("liduo","$element")
    }
}

produce方法與actor方法內(nèi)部對(duì)Channel對(duì)象做了處理,當(dāng)協(xié)程執(zhí)行完畢,自動(dòng)關(guān)閉Channel對(duì)象。

但目前,produce方法還處于試驗(yàn)階段(被ExperimentalCoroutinesApi注解修飾)。而actor方法也已經(jīng)過(guò)時(shí)(被ObsoleteCoroutinesApi注解修飾)。因此在實(shí)際開發(fā)中最好不要使用!

4)BroadcastChannel

當(dāng)遇到一個(gè)發(fā)送者對(duì)應(yīng)多個(gè)接收者的場(chǎng)景時(shí),可以使用BroadcastChannel。代碼如下:

fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> = 
    when (capacity) {
        0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
        UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
        CONFLATED -> ConflatedBroadcastChannel()
        BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayBroadcastChannel(capacity)
    }

創(chuàng)建BroadcastChannel對(duì)象時(shí),必須指定容量大小。接收者通過(guò)調(diào)用BroadcastChannel對(duì)象的openSubscription方法,獲取ReceiveChannel對(duì)象來(lái)接收消息。示例代碼如下:

// 創(chuàng)建BroadcastChannel,容量為5
val broadcastChannel = BroadcastChannel<Int>(5)
// 創(chuàng)建發(fā)送者協(xié)程
GlobalScope.launch {
    // 發(fā)送 1
    broadcastChannel.send(1)
    delay(100)
    // 發(fā)送 2
    broadcastChannel.send(2)
    // 關(guān)閉
    broadcastChannel.close()
}.join()
// 創(chuàng)建接收者1協(xié)程
GlobalScope.launch {
    // 獲取ReceiveChannel
    val receiveChannel = broadcastChannel.openSubscription()
    // 接收
    for (element in receiveChannel) {
        Log.d("receiver_1: ", "$element")
    }
}.join()
// 創(chuàng)建接收者2協(xié)程
GlobalScope.launch {
    // 獲取ReceiveChannel
    val receiveChannel = broadcastChannel.openSubscription()
    // 接收
    for (element in receiveChannel) {
        Log.d("receiver_2: ", "$element")
    }
}.join()

每個(gè)接收者都可以收到發(fā)送者發(fā)送的每一條消息。使用擴(kuò)展方法broadcast可以直接將Channel對(duì)象轉(zhuǎn)化為BroadcastChannel對(duì)象,示例代碼如下:

val channel = Channel<Int>()
val broadcastChannel = channel.broadcast(10)

BroadcastChannel的很多方法也處于試驗(yàn)階段(被ExperimentalCoroutinesApi注解修飾),使用時(shí)需慎重!

3.多路復(fù)用

協(xié)程中提供了類似Java中Nio的select方法,用于多路復(fù)用,代碼如下:

suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R

以Channel的多路復(fù)用為例,具體看一下select方法的使用。示例代碼如下:

private suspend fun test() {
    // 創(chuàng)建一個(gè)Channel列表
    val channelList = mutableListOf<Channel<Int>>()
    // 假設(shè)其中有5個(gè)Channel
    channelList.add(Channel())
    channelList.add(Channel())
    channelList.add(Channel())
    channelList.add(Channel())
    channelList.add(Channel())
    // 調(diào)用select方法,協(xié)程掛起
    val result = select<Int> {
        // 對(duì)5個(gè)Channel進(jìn)行注冊(cè)監(jiān)聽,等待接收
        channelList.forEach {
            it.onReceive
        }
    }
    // 當(dāng)5個(gè)Channel中任意一個(gè)接收到消息時(shí),select掛起恢復(fù)
    // 并將返回值賦給result
    Log.d("liduo", "$result")
}

除此之外,協(xié)程中還有很多接口定義了名字為"onXXX"的方法,比如Job接口的onJoin方法,Deferred接口的onAwait方法,都是用于配合select方法來(lái)進(jìn)行多路復(fù)用。

4.序列生成器

協(xié)程中提供了sequence方法來(lái)生成序列。示例代碼如下:

private suspend fun test() {
    // 創(chuàng)建一個(gè)可以輸出奇數(shù)的序列,泛型可省略,自動(dòng)推斷
    val singleNumber = sequence<Int> {
        val i = 0
        while (true) {
            // 在需要輸出的地方調(diào)用yield方法
            yield(2 * i - 1)
        }
    }
    // 調(diào)用迭代器,獲取序列的輸出
    singleNumber.iterator().forEach {
        Log.d("liduo", "$it")
    }
    // 獲取序列前五項(xiàng),迭代輸出
    singleNumber.take(5).forEach {
        Log.d("liduo", "$it")
    }
}

調(diào)用yield方法會(huì)使協(xié)程掛起,同時(shí)輸出這個(gè)序列當(dāng)前生成的值。除此之外,也可以調(diào)用yieldAll方法來(lái)輸出序列產(chǎn)生值的合集,示例代碼如下:

private suspend fun test() {
    // 創(chuàng)建一個(gè)可以輸出奇數(shù)的序列,泛型可省略,自動(dòng)推斷
    val singleNumber = sequence<Int> {
        yieldAll(listOf(1,3,5,7))
        yieldAll(listOf(9,11,13))
        yieldAll(listOf(15,17))
    }
    // 調(diào)用迭代器,獲取序列的輸出,最多為9項(xiàng)
    singleNumber.iterator().forEach {
        Log.d("liduo", "$it")
    }
    // 獲取序列前五項(xiàng),迭代輸出
    singleNumber.take(5).forEach {
        // 1,3,5,7,9
        Log.d("liduo", "$it")
    }    
}

5.協(xié)程異步流

協(xié)程中提供了類似RxJava的響應(yīng)式編程API——Flow(官方稱為異步冷數(shù)據(jù)流,官方也提供了創(chuàng)建熱數(shù)據(jù)流的方法)。

1)基礎(chǔ)使用

// 在主線程上調(diào)用
GlobalScope.launch(Dispatchers.Main) {
    // 創(chuàng)建流
    flow<Int> {
        // 掛起,輸出返回值
        emit(1)
      // 設(shè)置流執(zhí)行的線程,并消費(fèi)流
    }.flowOn(Dispatchers.IO).collect {
            Log.d("liduo", "$it")
        }
}.join()

emit方法是一個(gè)掛起方法,類似sequence中的yield方法,用于輸出返回值。flowOn方法等同于Rxjava中的subscribeOn方法,用于切換flow執(zhí)行的線程。為了避免理解混淆,F(xiàn)low中沒有提供類似Rxjava中的observeOn方法,但可以通過(guò)指定流所在協(xié)程的上下文參數(shù)確定。collect方法等同于RxJava中的subscribe方法,用于觸發(fā)和消費(fèi)流。

一個(gè)流可以被多次消費(fèi),示例代碼如下:

GlobalScope.launch(Dispatchers.IO) {
    val mFlow = flow<Int> {
        emit(1)
    }.flowOn(Dispatchers.Main)
    mFlow.collect { Log.d("liduo1", "$it") }
    mFlow.collect { Log.d("liduo2", "$it") }
}.join()

2)異常處理

Flow支持類似try-catch-finally的異常處理。示例代碼如下:

flow<Int> {
    emit(1)
    // 拋出異常
    throw NullPointerException()
}.catch { cause: Throwable ->
    Log.d("liduo", "${cause.message}")
}.onCompletion { cause: Throwable? ->
    Log.d("liduo", "${cause?.message}")
}

catch方法用于捕獲異常。onCompletion方法等同于finally代碼塊。Kotlin不建議直接在flow中通過(guò)try-catch-finally代碼塊去捕獲異常!

Flow中還提供了類似RxJava的onErrorReturn方法的操作,示例代碼如下:

flow<Int> {
    emit(1)
    // 拋出異常
    throw NullPointerException()
}.catch { cause: Throwable ->
    Log.d("liduo", "${cause.message}")
    emit(-1)
}

3)觸發(fā)分離

Flow支持提前寫好流的消費(fèi),在必要的時(shí)候再去觸發(fā)消費(fèi)的操作。示例代碼如下:

// 創(chuàng)建Flow的方法
fun myFlow() = flow<Int> {
    // 生產(chǎn)過(guò)程
    emit(1)
}.onEach {
    // 消費(fèi)過(guò)程
    Log.d("liduo", "$it")
}
suspend fun main() {
    // 寫法1
    GlobalScope.launch {
        // 觸發(fā)消費(fèi)
        myFlow().collect()
    }.join()
    // 寫法2
    myFlow().launchIn(GlobalScope).join()
}

4)注意

  • Flow中不提供取消collect的方法。如果要取消flow的執(zhí)行,可以直接取消flow所在的協(xié)程。
  • emit方法不是線程安全的,因此不要在flow中調(diào)用withContext等方法切換調(diào)度器。如果需要切換,可以使用channelFlow。

6.全局上下文

在本文中,啟動(dòng)協(xié)程使用的都是GlobalScope,但在實(shí)際開發(fā)過(guò)程中,不應(yīng)該使用GlobalScope。GlobalScope會(huì)開啟一個(gè)全新的協(xié)程作用域,并且不受我們控制。假設(shè)Activity頁(yè)面關(guān)閉時(shí),其中的協(xié)程還沒有運(yùn)行結(jié)束,并且我們還無(wú)法取消協(xié)程的執(zhí)行,這時(shí)可能會(huì)導(dǎo)致內(nèi)存泄漏。因此,在實(shí)際開發(fā)中,可以自定義一個(gè)全局的協(xié)程作用域,或者至少按照以下方法書寫代碼:

// 實(shí)現(xiàn)CoroutineScope接口
class MainActivity : AppCompatActivity(),CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        // 直接啟動(dòng)協(xié)程
        launch {
            Log.d("liduo", "launch")
        }
    }
    override fun onDestroy() {
        super.onDestroy()
        // 取消頂級(jí)父協(xié)程
        cancel()
    }
}

MainScope的代碼如下:

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

Dispatchers.Main表示在主線程調(diào)度,SupervisorJob()表示子協(xié)程取消不會(huì)影響父協(xié)程。

到此這篇關(guān)于Kotlin協(xié)程概念原理與使用萬(wàn)字梳理的文章就介紹到這了,更多相關(guān)Kotlin協(xié)程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論