Kotlin?協(xié)程的取消機制詳細解讀
引言
在 Java 語言中提供了線程中斷的能力,但并不是所有的線程都可以中斷的,因為 interrupt 方法并不是真正的終止線程,而是將一個標志位標記為中斷狀態(tài),當運行到下一次中斷標志位檢查時,才能觸發(fā)終止線程。
但無論如何,終止線程是一個糟糕的方案,因為在線程的銷毀和重建,是要消耗系統(tǒng)資源的,造成了不必要的開銷。Kotlin 協(xié)程提供了更優(yōu)雅的取消機制,這也是協(xié)程比較核心的功能之一。
協(xié)程的狀態(tài)
在了解取消機制之前我們需要知道一些關于 Job 狀態(tài)的內容:
State | isActive(是否活躍) | isCompleted(是否完成) | isCancelled(是否取消) |
---|---|---|---|
New (可選初始狀態(tài)) | false | false | false |
Active (默認初始狀態(tài)) | true | false | false |
Completing (短暫態(tài)) | true | false | false |
Cancelling (短暫態(tài)) | false | false | true |
Cancelled (完成態(tài)) | false | true | true |
Completed (完成態(tài)) | false | true | false |
可以看出,在完成和取消的過程中,會經過一個短暫的進行中的狀態(tài),然后才變成已完成/已取消。
在這里只關注一下取消相關的狀態(tài):
- Cancelling
- 拋出異常的 Job 會導致其進入 Cancelling 狀態(tài),也可以使用 cancel 方法來隨時取消 Job 使其立即轉換為 Cancelling 狀態(tài)。
- Cancelled
- 當它遞歸取消子項,并等待所有的子項都取消后,該 Job 會進入 Cancelled 狀態(tài)。
取消協(xié)程的用法
協(xié)程在代碼中抽象的類型是 Job
, 下面是一個官方的代碼示例,用來展示如何取消協(xié)程的執(zhí)行:
suspend fun main(): Unit = coroutineScope { val job = launch { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancel() // cancels the job job.join() // waits for job's completion println("main: Now I can quit.") }
它的輸出是:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
一旦 mian 方法中調用了 job.cancel()
,我們就看不到其他協(xié)程的任何輸出,因為它已被取消了。
協(xié)程取消的有效性
協(xié)程代碼必須通過與掛起函數的配合才能被取消。kotlinx.coroutines 中所有掛起函數(帶有 suspend 關鍵字函數)都是可以被取消的。suspend 函數會檢查協(xié)程是否需要取消并在取消時拋出 CancellationException
。
但是,如果協(xié)程在運行過程中沒有掛起點,則不能取消協(xié)程,如下例所示:
suspend fun main(): Unit = coroutineScope { val startTime = System.currentTimeMillis() val job = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (i < 5) { // computation loop, just wastes CPU // print a message twice a second if (System.currentTimeMillis() >= nextPrintTime) { println("job: I'm sleeping ${i++} ...") nextPrintTime += 500L } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
在這個 job 中,并沒有執(zhí)行任何 suspend 函數,所以在執(zhí)行過程中并沒有對協(xié)程是否需要取消進行檢查,自然也就無法觸發(fā)取消。
同樣的問題也可以在通過 捕獲 CancellationException 并且不拋出的情況下 觀察到:
suspend fun main(): Unit = coroutineScope { val job = launch(Dispatchers.Default) { repeat(5) { i -> try { // print a message twice a second println("job: I'm sleeping $i ...") delay(500) } catch (e: Exception) { // log the exception println(e) } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
打印結果是:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9
job: I'm sleeping 3 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9
job: I'm sleeping 4 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9
main: Now I can quit.
從打印結果來看,循環(huán) 5 次全部執(zhí)行了,好像取消并沒有起到作用。但實際上不是這樣的,為了便于觀察加上時間戳:
1665217217682: job: I'm sleeping 0 ...
1665217218196: job: I'm sleeping 1 ...
1665217218697: job: I'm sleeping 2 ...
1665217218996: main: I'm tired of waiting!
1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d
1665217219000: job: I'm sleeping 3 ...
1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d
1665217219000: job: I'm sleeping 4 ...
1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d
1665217219001: main: Now I can quit.
加上時間可以看出,拋出第一次異常后的兩次循環(huán)和異常捕獲都是在同一瞬間完成的。這說明了捕獲到異常后,仍然會執(zhí)行代碼,但是所有的 delay 方法都沒有生效,即該 Job 的所有子 Job 都失效了。但該 Job 仍在繼續(xù)循環(huán)打印。原因是,父 Job 會等所有子 Job 處理結束后才能完成取消。
而如果我們不使用 try-catch 呢?
suspend fun main(): Unit = coroutineScope { val job = launch(Dispatchers.Default) { repeat(5) { i -> // print a message twice a second println("job: I'm sleeping $i ...") delay(500) } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
打印結果:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
很順利的取消了,這是因為協(xié)程拋出 Exception 直接終止了。
注意協(xié)程拋出 CancellationException 并不會導致 App Crash 。
使用 try-catch 來捕獲 CancellationException 時需要注意,在掛起函數前的代碼邏輯仍會多次執(zhí)行,從而導致這部分代碼仿佛沒有被取消一樣。
如何寫出可以取消的代碼
有兩種方法可以使代碼是可取消的。第一種方法是定期調用掛起函數,檢查是否取消,就是上面的例子中的方法;另一個是顯式檢查取消狀態(tài):
suspend fun main(): Unit = coroutineScope { val startTime = System.currentTimeMillis() val job = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (isActive) { // cancellable computation loop // print a message twice a second if (System.currentTimeMillis() >= nextPrintTime) { println("job: I'm sleeping ${i++} ...") nextPrintTime += 500L } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
將上面的循環(huán) 5 次通過使用 while (isActive) 進行替換,實現顯示檢查取消的代碼。isActive 是通過 CoroutineScope 對象在協(xié)程內部可用的擴展屬性。
在 finally 中釋放資源
在前面的例子中我們使用 try-catch 捕獲 CancellationException 發(fā)現會產生父協(xié)程等待所有子協(xié)程完成后才能完成,所以建議不用 try-catch
而是 try{…} finally{…}
,讓父協(xié)程在被取消時正常執(zhí)行終結操作:
val job = launch { try { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } finally { println("job: I'm running finally") } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.")
join 和 cancelAndJoin 都要等待所有終結操作完成,所以上面的例子產生了以下輸出:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.
使用不可取消的 block
如果在在上面的示例的 finally 代碼塊中使用 suspend 函數,會導致拋出 CancellationException 。
因為運行這些代碼的協(xié)程已經被取消了。通常情況下這不會有任何問題,然而,在極少數情況下,如果你需要在 finally 中使用一個掛起函數,你可以通過使用 withContext(NonCancellable) { ... }
:
val job = launch { try { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } finally { withContext(NonCancellable) { println("job: I'm running finally") delay(1000L) println("job: And I've just delayed for 1 sec because I'm non-cancellable") } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.")
CancellationException
在上面的內容中,我們知道協(xié)程的取消是通過拋出 CancellationException 來進行的,神奇的是拋出 Exception 并沒有導致應用程序 Crash 。
CancellationException 的真實實現是 j.u.c. 中的 CancellationException :
public actual typealias CancellationException = java.util.concurrent.CancellationException
如果協(xié)程的 Job 被取消,則由可取消的掛起函數拋出 CancellationException 。它表示協(xié)程的正常取消。在默認的 CoroutineExceptionHandler 下,它不會打印到控制臺/日志。
上面引用了這個類的注釋,看來處理拋出異常的邏輯在 CoroutineExceptionHandler 中:
public interface CoroutineExceptionHandler : CoroutineContext.Element { /** * Key for [CoroutineExceptionHandler] instance in the coroutine context. */ public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler> /** * Handles uncaught [exception] in the given [context]. It is invoked * if coroutine has an uncaught exception. */ public fun handleException(context: CoroutineContext, exception: Throwable) }
通常,未捕獲的 Exception 只能由使用協(xié)程構建器的根協(xié)程產生。所有子協(xié)程都將異常的處理委托給他們的父協(xié)程,父協(xié)程也委托給它自身的父協(xié)程,直到委托給根協(xié)程處理。所以在子協(xié)程中的 CoroutineExceptionHandler 永遠不會被使用。
使用 SupervisorJob 運行的協(xié)程不會將異常傳遞給它們的父協(xié)程,SupervisorJob 被視為根協(xié)程。
使用 async 創(chuàng)建的協(xié)程總是捕獲它的所有異常通過結果 Deferred 對象回調出去,因此它不能導致未捕獲的異常。
CoroutineExceptionHandler 用于記錄異常、顯示某種類型的錯誤消息、終止和/或重新啟動應用程序。
如果需要在代碼的特定部分處理異常,建議在協(xié)程中的相應代碼周圍使用 try-catch。通過這種方式,您可以阻止異常協(xié)程的完成(異?,F在被捕獲),重試操作,和/或采取其他任意操作。 這也就是我們前面論證的在協(xié)程中使用 try-catch 導致的取消失效。
默認情況下,如果協(xié)程沒有配置用于處理異常的 Handler ,未捕獲的異常將按以下方式處理:
如果 exception 是 CancellationException ,那么它將被忽略(因為這是取消正在運行的協(xié)程的假定機制)。
其他情況:
- 如果上下文中有一個 Job,那么調用
job.cancel()
。 - 否則,通過 ServiceLoader 找到的 CoroutineExceptionHandler 的所有實例并調用當前線程的 Thread.uncaughtExceptionHandler 來處理異常。
超時取消
取消協(xié)程執(zhí)行的最合適的應用場景是它的執(zhí)行時間超過了規(guī)定的最大時間時自動取消任務。在 Kotlin 協(xié)程庫中提供了 withTimeout 方法來實現這個功能:
withTimeout(1300L) { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } }
執(zhí)行結果:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
TimeoutCancellationException 是 CancellationException 的子類,TimeoutCancellationException 通過 withTimeout 函數拋出。
在本例中,我們在main函數中使用了withTimeout ,運行過程中會導致 Crash 。
有兩種解決辦法,就是使用 try{…} catch (e: TimeoutCancellationException){…}
代碼塊;另一種辦法是使用在超時的情況下不是拋出異常而是返回 null 的 withTimeoutOrNull 函數:
val result = withTimeoutOrNull(1300L) { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } "Done" // will get cancelled before it produces this result } println("Result is $result")
打印結果:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
異步的超時和資源
withTimeout 中的超時事件相對于在其塊中運行的代碼是異步的,并且可能在任何時間發(fā)生,甚至在從超時塊內部返回之前。如果你在塊內部打開或獲取一些資源,需要關閉或釋放到塊外部。
例如,在這里,我們用 Resource 類模擬一個可關閉資源,它只是通過對獲得的計數器遞增,并對該計數器從其關閉函數遞減來跟蹤創(chuàng)建次數。讓我們用小超時運行大量的協(xié)程,嘗試在一段延遲后從withTimeout塊內部獲取這個資源,并從外部釋放它。
var acquired = 0 class Resource { init { acquired++ } // Acquire the resource fun close() { acquired-- } // Release the resource } fun main() { runBlocking { repeat(100_000) { // Launch 100K coroutines launch { val resource = withTimeout(60) { // Timeout of 60 ms delay(50) // Delay for 50 ms Resource() // Acquire a resource and return it from withTimeout block } resource.close() // Release the resource } } } // Outside of runBlocking all coroutines have completed println(acquired) // Print the number of resources still acquired }
如果運行上面的代碼,您將看到它并不總是打印 0,盡管它可能取決于您的機器的時間,在本例中您可能需要調整超時以實際看到非零值。
要解決這個問題,可以在變量中存儲對資源的引用,而不是從withTimeout塊返回它。
fun main() { runBlocking { repeat(100_000) { // Launch 100K coroutines launch { var resource: Resource? = null // Not acquired yet try { withTimeout(60) { // Timeout of 60 ms delay(50) // Delay for 50 ms resource = Resource() // Store a resource to the variable if acquired } // We can do something else with the resource here } finally { resource?.close() // Release the resource if it was acquired } } } } // Outside of runBlocking all coroutines have completed println(acquired) // Print the number of resources still acquired }
這樣這個例子總是輸出0。資源不會泄漏。
取消檢查的底層原理
在探索協(xié)程取消的有效性時,我們知道協(xié)程代碼必須通過與掛起函數的配合才能被取消。
kotlinx.coroutines 中所有掛起函數(帶有 suspend 關鍵字函數)都是可以被取消的。suspend 函數會檢查協(xié)程是否需要取消并在取消時拋出 CancellationException 。
關于協(xié)程的取消機制,很明顯和 suspend 關鍵字有關。為了測試 suspend 關鍵字的作用,實現下面的代碼:
class Solution { suspend fun func(): String { return "測試 suspend 關鍵字" } }
作為對照組,另一個是不加 suspend 關鍵字的 func 方法:
class Solution { fun func(): String { return "測試 suspend 關鍵字" } }
兩者反編譯成 Java :
// 普通的方法 public final class Solution { public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution(); @NotNull public final String func() { return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution(); } } // 帶有 suspend 關鍵字的方法 public final class Solution { public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution(); @Nullable public final Object func(@NotNull Continuation<? super String> $completion) { return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution(); } }
suspend 關鍵字修飾的方法反編譯后默認生成了帶有 Continuation 參數的方法。說明 suspend 關鍵字的玄機在 Continuation 類中。
Continuation 是 Kotlin 協(xié)程的核心思想 Continuation-Passing Style 的實現。原理參考簡述協(xié)程的底層實現原理 。
通過在普通函數的參數中增加一個 Continuation 參數,這個 continuation 的性質類似于一個 lambda 對象,將方法的返回值類型傳遞到這個 lambda 代碼塊中。
什么意思呢?就是本來這個方法的返回類型直接 return 出來的:
val a: String = func() print(a)
而經過 suspend 修飾,代碼變成了這個樣子:
func { a -> print(a) }
Kotlin 協(xié)程就是通過這樣的包裝,將比如 launch 方法,實際上是 launch 最后一個參數接收的是 lambda 參數。也就是把外部邏輯傳遞給函數內部執(zhí)行。
回過頭來再來理解 suspend 關鍵字,我們知道帶有 suspend 關鍵字的方法會對協(xié)程的取消進行檢查,從而取消協(xié)程的執(zhí)行。從這個能力上來看,我理解他應該會自動生成類似下面的邏輯代碼:
生成的函數 { if(!當前協(xié)程.isActive) { throw CancellationException() } // ... 這里是函數真實邏輯 }
suspend 修飾的函數,會自動生成一個掛起點,來檢查協(xié)程是否應該被掛起。
顯然 Continuation 中聲明的函數也證實了掛起的功能:
public interface Continuation<in T> { /** * The context of the coroutine that corresponds to this continuation. */ public val context: CoroutineContext /** * 恢復相應協(xié)程的執(zhí)行,將成功或失敗的結果作為最后一個掛起點的返回值傳遞。 */ public fun resumeWith(result: Result<T>) }
協(xié)程本質上是產生了一個 switch 語句,每個掛起點之間的邏輯都是一個 case 分支的邏輯。參考 協(xié)程是如何實現的 中的例子:
Function1 lambda = (Function1)(new Function1((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { byte text; @BlockTag1: { Object result; @BlockTag2: { result = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); this.label = 1; if (SuspendTestKt.dummy(this) == result) { return result; } break; case 1: ResultKt.throwOnFailure($result); break; case 2: ResultKt.throwOnFailure($result); break @BlockTag2; case 3: ResultKt.throwOnFailure($result); break @BlockTag1; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } text = 1; System.out.println(text); this.label = 2; if (SuspendTestKt.dummy(this) == result) { return result; } } text = 2; System.out.println(text); this.label = 3; if (SuspendTestKt.dummy(this) == result) { return result; } } text = 3; System.out.println(text); return Unit.INSTANCE; } @NotNull public final Continuation create(@NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function1 funcation = new <anonymous constructor>(completion); return funcation; } public final Object invoke(Object object) { return ((<undefinedtype>)this.create((Continuation)object)).invokeSuspend(Unit.INSTANCE); } });
可以看出,在每個分支都會執(zhí)行一次 ResultKt.throwOnFailure($result);
,從名字上就知道,這就是檢查是否需要取消并拋出異常的代碼所在:
@PublishedApi @SinceKotlin("1.3") internal fun Result<*>.throwOnFailure() { if (value is Result.Failure) throw value.exception }
這里的 Result 類是一個包裝類,它將成功的結果封裝為類型 T 的值,或將失敗的結果封裝為帶有任意Throwable異常的值。
@Suppress("INAPPLICABLE_JVM_NAME") @InlineOnly @JvmName("success") public inline fun <T> success(value: T): Result<T> = Result(value) /** * Returns an instance that encapsulates the given [Throwable] [exception] as failure. */ @Suppress("INAPPLICABLE_JVM_NAME") @InlineOnly @JvmName("failure") public inline fun <T> failure(exception: Throwable): Result<T> = Result(createFailure(exception))
成功和失敗的方法類型是不一樣的,證實了這一點,success 方法接收類型為 T 的參數;failure 接收 Throwable 類型的參數。
到這里 suspend 方法掛起的原理就明了了:在協(xié)程的狀態(tài)機中,通過掛起點會分割出不同的狀態(tài),對每一個狀態(tài),會先進行掛起結果的檢查。 這會導致以下結果:
- 協(xié)程的取消機制是通過掛起函數的掛起點檢查來進行取消檢查的。證實了為什么如果沒有 suspend 函數(本質是掛起點),協(xié)程的取消就不會生效。
- 協(xié)程的取消機制是需要函數合作的,就是通過 suspend 函數來增加取消檢查的時機。
- 父協(xié)程會執(zhí)行完所有的子協(xié)程(掛起函數),因為代碼的本質是一個循環(huán)執(zhí)行 switch 語句,當一個子協(xié)程(或掛起函數)執(zhí)行結束,會繼續(xù)執(zhí)行到下一個分支。但是最后一個掛起點后續(xù)的代碼并不會被執(zhí)行,因為最后一個掛起點檢查到失敗,不會繼續(xù)跳到最后的 label 分支。
以上就是Kotlin 協(xié)程的取消機制詳細解讀的詳細內容,更多關于Kotlin 協(xié)程取消機制的資料請關注腳本之家其它相關文章!
相關文章
從源碼編譯Android系統(tǒng)的Java類庫和JNI動態(tài)庫的方法
這篇文章主要介紹了從源碼編譯Android系統(tǒng)的Java類庫和JNI動態(tài)庫的方法,例子基于Linux系統(tǒng)環(huán)境下來講,需要的朋友可以參考下2016-02-02Android 中 onSaveInstanceState()使用方法詳解
這篇文章主要介紹了Android 中 onSaveInstanceState()使用方法詳解的相關資料,希望通過本文大家能夠掌握這部分知識,需要的朋友可以參考下2017-09-09Android用戶輸入自動提示控件AutoCompleteTextView使用方法
這篇文章主要為大家詳細介紹了Android用戶輸入自動提示控件AutoCompleteTextView的使用方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08Android開發(fā)使用Message對象分發(fā)必備知識點詳解
這篇文章主要為大家介紹了Android開發(fā)使用Message對象分發(fā)必備知識點詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10Unity3D游戲引擎實現在Android中打開WebView的實例
這篇文章主要介紹了Unity3D游戲引擎在Android中打開WebView的實例,需要的朋友可以參考下2014-07-07android?studio組件通信:Intend啟動Activity接收返回結果
這篇文章主要介紹了android?studio組件通信:Intend啟動Activity接收返回結果,設計一個主Activity和一個子Activity(Sub-Activity),使用主Activity上的按鈕啟動子Activity,并將子Activity的一些信息返回給主Activity,并顯示在主Activity上,需要的朋友可以參考一下2021-12-12