C#語(yǔ)言async?await工作原理示例解析
正文
前不久,我們發(fā)布了《選擇 .NET 的 n 個(gè)理由》。它提供了對(duì)平臺(tái)的高層次概述,總結(jié)了各種組件和設(shè)計(jì)決策,并承諾對(duì)所涉及的領(lǐng)域發(fā)表更深入的文章。這是第一篇這樣深入探討 C# 和 .NET 中 async/await 的歷史、背后的設(shè)計(jì)決策和實(shí)現(xiàn)細(xì)節(jié)的文章。
對(duì) async/await 的支持已經(jīng)存在了十年之久。在這段時(shí)間里,它改變了為 .NET 編寫可擴(kuò)展代碼的方式,而在不了解其底層邏輯的情況下使用該功能是可行的,也是非常常見(jiàn)的。在這篇文章中,我們將深入探討 await 在語(yǔ)言、編譯器和庫(kù)級(jí)別的工作原理,以便你可以充分利用這些有價(jià)值的功能。
不過(guò),要做到這一點(diǎn),我們需要追溯到 async/await 之前,以了解在沒(méi)有它的情況下最先進(jìn)的異步代碼是什么樣子的。
最初的樣子
早在 .NET Framework 1.0中,就有異步編程模型模式,又稱 APM 模式、Begin/End 模式、IAsyncResult 模式。在高層次上,該模式很簡(jiǎn)單。對(duì)于同步操作 DoStuff:
class Handler { public int DoStuff(string arg); }
作為模式的一部分,將有兩個(gè)相應(yīng)的方法:BeginDoStuff 方法和 EndDoStuff 方法:
class Handler { public int DoStuff(string arg); public IAsyncResult BeginDoStuff(string arg, AsyncCallback? callback, object? state); public int EndDoStuff(IAsyncResult asyncResult); }
BeginDoStuff 會(huì)像 DoStuff 一樣接受所有相同的參數(shù),但除此之外,它還會(huì)接受 AsyncCallback 委托和一個(gè)不透明的狀態(tài)對(duì)象,其中一個(gè)或兩個(gè)都可以為 null。Begin 方法負(fù)責(zé)初始化異步操作,如果提供了回調(diào)(通常稱為初始操作的“延續(xù)”),它還負(fù)責(zé)確保在異步操作完成時(shí)調(diào)用回調(diào)。Begin 方法還將構(gòu)造一個(gè)實(shí)現(xiàn)了 IAsyncResult 的類型實(shí)例,使用可選狀態(tài)填充 IAsyncResult 的 AsyncState 屬性:
namespace System { public interface IAsyncResult { object? AsyncState { get; } WaitHandle AsyncWaitHandle { get; } bool IsCompleted { get; } bool CompletedSynchronously { get; } } public delegate void AsyncCallback(IAsyncResult ar); }
然后,這個(gè) IAsyncResult 實(shí)例將從 Begin 方法返回,并在最終調(diào)用 AsyncCallback 時(shí)傳遞給它。當(dāng)準(zhǔn)備使用操作的結(jié)果時(shí),調(diào)用者將把 IAsyncResult 實(shí)例傳遞給 End 方法,該方法負(fù)責(zé)確保操作已完成(如果沒(méi)有完成,則通過(guò)阻塞同步等待操作完成),然后返回操作的任何結(jié)果,包括傳播可能發(fā)生的任何錯(cuò)誤和異常。因此,不用像下面這樣寫代碼來(lái)同步執(zhí)行操作:
try { int i = handler.DoStuff(arg); Use(i); } catch (Exception e) { ... // handle exceptions from DoStuff and Use }
可以按以下方式使用 Begin/End 方法異步執(zhí)行相同的操作:
try { handler.BeginDoStuff(arg, iar => { try { Handler handler = (Handler)iar.AsyncState!; int i = handler.EndDoStuff(iar); Use(i); } catch (Exception e2) { ... // handle exceptions from EndDoStuff and Use } }, handler); } catch (Exception e) { ... // handle exceptions thrown from the synchronous call to BeginDoStuff }
對(duì)于在任何語(yǔ)言中處理過(guò)基于回調(diào)的 API 的人來(lái)說(shuō),這應(yīng)該感覺(jué)很熟悉。
然而,事情從此變得更加復(fù)雜。例如,有一個(gè)"stack dives"的問(wèn)題。stack dives 是指代碼反復(fù)調(diào)用,在堆棧中越陷越深,以至于可能出現(xiàn)堆棧溢出。如果操作同步完成,Begin 方法被允許同步調(diào)用回調(diào),這意味著對(duì) Begin 的調(diào)用本身可能直接調(diào)用回調(diào)。同步完成的 "異步 "操作實(shí)際上是很常見(jiàn)的;它們不是 "異步",因?yàn)樗鼈儽槐WC異步完成,而只是被允許這樣做。
這是一種真實(shí)的可能性,很容易再現(xiàn)。
.NET Core 運(yùn)行
在 .NET Core 上試試這個(gè)程序
using System.NET; using System.NET.Sockets; using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); listener.Listen(); using Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); client.Connect(listener.LocalEndPoint!); using Socket server = listener.Accept(); _ = server.SendAsync(new byte[100_000]); var mres = new ManualResetEventSlim(); byte[] buffer = new byte[1]; var stream = new NetworkStream(client); void ReadAgain() { stream.BeginRead(buffer, 0, 1, iar => { if (stream.EndRead(iar) != 0) { ReadAgain(); // uh oh! } else { mres.Set(); } }, null); }; ReadAgain(); mres.Wait();
在這里,我設(shè)置了一個(gè)相互連接的簡(jiǎn)單客戶端套接字和服務(wù)器套接字。服務(wù)器向客戶端發(fā)送100,000字節(jié),然后客戶端繼續(xù)使用 BeginRead/EndRead 來(lái)“異步”地每次讀取一個(gè)字節(jié)。傳給 BeginRead 的回調(diào)函數(shù)通過(guò)調(diào)用 EndRead 來(lái)完成讀取,然后如果它成功讀取了所需的字節(jié),它會(huì)通過(guò)遞歸調(diào)用 ReadAgain 局部函數(shù)來(lái)發(fā)出另一個(gè) BeginRead。然而,在 .NET Core 中,套接字操作比在 .NET Framework 上快得多,并且如果操作系統(tǒng)能夠滿足同步操作,它將同步完成(注意內(nèi)核本身有一個(gè)緩沖區(qū)用于滿足套接字接收操作)。因此,這個(gè)堆棧會(huì)溢出:
因此,APM 模型中內(nèi)置了補(bǔ)償機(jī)制。有兩種可能的方法可以彌補(bǔ)這一點(diǎn):
1.不要允許 AsyncCallback 被同步調(diào)用。如果一直異步調(diào)用它,即使操作以同步方式完成,那么 stack dives 的風(fēng)險(xiǎn)也會(huì)消失。但是性能也是如此,因?yàn)橥酵瓿傻牟僮?或者快到無(wú)法觀察到它們的區(qū)別)是非常常見(jiàn)的,強(qiáng)迫每個(gè)操作排隊(duì)回調(diào)會(huì)增加可測(cè)量的開(kāi)銷。
2.使用一種機(jī)制,允許調(diào)用方而不是回調(diào)方在操作同步完成時(shí)執(zhí)行延續(xù)工作。這樣,您就可以避開(kāi)額外的方法框架,繼續(xù)執(zhí)行后續(xù)工作,而不深入堆棧。
APM 模式與方法2一起使用。為此,IAsyncResult 接口公開(kāi)了兩個(gè)相關(guān)但不同的成員:IsCompleted 和 CompletedSynchronously。IsCompleted 告訴你操作是否已經(jīng)完成,可以多次檢查它,最終它會(huì)從 false 轉(zhuǎn)換為 true,然后保持不變。相比之下,CompletedSynchronously 永遠(yuǎn)不會(huì)改變(如果改變了,那就是一個(gè)令人討厭的 bug)。它用于 Begin 方法的調(diào)用者和 AsyncCallback 之間的通信,他們中的一個(gè)負(fù)責(zé)執(zhí)行任何延續(xù)工作。如果 CompletedSynchronously 為 false,則操作是異步完成的,響應(yīng)操作完成的任何后續(xù)工作都應(yīng)該留給回調(diào);畢竟,如果工作沒(méi)有同步完成,Begin 的調(diào)用方無(wú)法真正處理它,因?yàn)檫€不知道操作已經(jīng)完成(如果調(diào)用方只是調(diào)用 End,它將阻塞直到操作完成)。然而,如果 CompletedSynchronously 為真,如果回調(diào)要處理延續(xù)工作,那么它就有 stack dives 的風(fēng)險(xiǎn),因?yàn)樗鼘⒃诙褩I蠄?zhí)行比開(kāi)始時(shí)更深的延續(xù)工作。因此,任何涉及到這種堆棧潛水的實(shí)現(xiàn)都需要檢查 CompletedSynchronously,并讓 Begin 方法的調(diào)用者執(zhí)行延續(xù)工作(如果它為真),這意味著回調(diào)不需要執(zhí)行延續(xù)工作。這也是 CompletedSynchronously 永遠(yuǎn)不能更改的原因,調(diào)用方和回調(diào)方需要看到相同的值,以確保不管競(jìng)爭(zhēng)條件如何,延續(xù)工作只執(zhí)行一次。
我們都習(xí)慣了現(xiàn)代語(yǔ)言中的控制流結(jié)構(gòu)為我們提供的強(qiáng)大和簡(jiǎn)單性,一旦引入了任何合理的復(fù)雜性,而基于回調(diào)的方法通常會(huì)與這種結(jié)構(gòu)相沖突。其他主流語(yǔ)言也沒(méi)有更好的替代方案。
我們需要一種更好的方法,一種從 APM 模式中學(xué)習(xí)的方法,融合它正確的東西,同時(shí)避免它的陷阱。值得注意的是,APM 模式只是一種模式。運(yùn)行時(shí)間、核心庫(kù)和編譯器在使用或?qū)崿F(xiàn)該模式時(shí)并沒(méi)有提供任何幫助。
基于事件的異步模式
.NET Framework 2.0引入了一些 API,實(shí)現(xiàn)了處理異步操作的不同模式,這種模式主要用于在客戶端應(yīng)用程序上下文中處理異步操作。這種基于事件的異步模式或 EAP 也作為一對(duì)成員出現(xiàn),這次是一個(gè)用于初始化異步操作的方法和一個(gè)用于偵聽(tīng)其完成的事件。因此,我們之前的 DoStuff 示例可能被公開(kāi)為一組成員,如下所示:
class Handler { public int DoStuff(string arg); public void DoStuffAsync(string arg, object? userToken); public event DoStuffEventHandler? DoStuffCompleted; } public delegate void DoStuffEventHandler(object sender, DoStuffEventArgs e); public class DoStuffEventArgs : AsyncCompletedEventArgs { public DoStuffEventArgs(int result, Exception? error, bool canceled, object? userToken) : base(error, canceled, usertoken) => Result = result; public int Result { get; } }
你需要用 DoStuffCompleted 事件注冊(cè)你的后續(xù)工作,然后調(diào)用 DoStuffAsync 方法;它將啟動(dòng)該操作,并且在該操作完成時(shí),調(diào)用者將異步地引發(fā) DoStuffCompleted 事件。然后,處理程序可以繼續(xù)執(zhí)行后續(xù)工作,可能會(huì)驗(yàn)證所提供的 userToken 與它所期望的進(jìn)行匹配,從而允許多個(gè)處理程序同時(shí)連接到事件。
這種模式使一些用例變得更簡(jiǎn)單,同時(shí)使其他用例變得更加困難(考慮到前面的 APM CopyStreamToStream 示例,這說(shuō)明了一些問(wèn)題)。它沒(méi)有以廣泛的方式推出,只是在一個(gè)單獨(dú)的 .NET Framework 版本中匆匆的出現(xiàn)又消失了,盡管留下了它使用期間添加的 api,如 Ping.SendAsync/Ping.PingCompleted:
public class Ping : Component { public void SendAsync(string hostNameOrAddress, object? userToken); public event PingCompletedEventHandler? PingCompleted; ... }
然而,它確實(shí)取得了一個(gè) APM 模式完全沒(méi)有考慮到的顯著進(jìn)步,并且這一點(diǎn)一直延續(xù)到我們今天所接受的模型中: SynchronizationContext。
考慮到像 Windows Forms 這樣的 UI 框架。與 Windows 上的大多數(shù) UI 框架一樣,控件與特定的線程相關(guān)聯(lián),該線程運(yùn)行一個(gè)消息泵,該消息泵運(yùn)行能夠與這些控件交互的工作,只有該線程應(yīng)該嘗試操作這些控件,而任何其他想要與控件交互的線程都應(yīng)該通過(guò)發(fā)送消息由 UI 線程的泵消耗來(lái)完成操作。Windows 窗體使用 ControlBeginInvoke 等方法使這變得很容易,它將提供的委托和參數(shù)排隊(duì),由與該控件相關(guān)聯(lián)的任何線程運(yùn)行。因此,你可以這樣編寫代碼:
private void button1_Click(object sender, EventArgs e) { ThreadPool.QueueUserWorkItem(_ => { string message = ComputeMessage(); button1.BeginInvoke(() => { button1.Text = message; }); }); }
這將卸載在 ThreadPool 線程上完成的 ComputeMessage()工作(以便在處理 UI 的過(guò)程中保持 UI 的響應(yīng)性),然后在工作完成時(shí),將委托隊(duì)列返回到與 button1 相關(guān)的線程,以更新 button1 的標(biāo)簽。這很簡(jiǎn)單,WPF 也有類似的東西,只是用它的 Dispatcher 類型:
private void button1_Click(object sender, RoutedEventArgs e){ ThreadPool.QueueUserWorkItem(_ => { string message = ComputeMessage(); button1.Dispatcher.InvokeAsync(() => { button1.Content = message; }); });}
.NET MAUI 也有類似的功能。但如果我想把這個(gè)邏輯放到輔助方法中呢?
E.g.
// Call ComputeMessage and then invoke the update action to update controls.internal static void ComputeMessageAndInvokeUpdate(Action<string> update) { ... }
然后我可以這樣使用它:
private void button1_Click(object sender, EventArgs e){ ComputeMessageAndInvokeUpdate(message => button1.Text = message);}
但是如何實(shí)現(xiàn) ComputeMessageAndInvokeUpdate,使其能夠在這些應(yīng)用程序中工作呢?是否需要硬編碼才能了解每個(gè)可能的 UI 框架?這就是 SynchronizationContext 的魅力所在。我們可以這樣實(shí)現(xiàn)這個(gè)方法:
internal static void ComputeMessageAndInvokeUpdate(Action<string> update){ SynchronizationContext? sc = SynchronizationContext.Current; ThreadPool.QueueUserWorkItem(_ => { string message = ComputeMessage(); if (sc is not null) { sc.Post(_ => update(message), null); } else { update(message); } });}
它使用 SynchronizationContext 作為一個(gè)抽象,目標(biāo)是任何“調(diào)度器”,應(yīng)該用于回到與 UI 交互的必要環(huán)境。然后,每個(gè)應(yīng)用程序模型確保它作為 SynchronizationContext.Current 發(fā)布一個(gè) SynchronizationContext-derived 類型,去做 "正確的事情"。例如,Windows Forms 有這個(gè):
public sealed class WindowsFormsSynchronizationContext : SynchronizationContext, IDisposable{ public override void Post(SendOrPostCallback d, object? state) => _controlToSendTo?.BeginInvoke(d, new object?[] { state }); ...}
WPF 有這個(gè):
public sealed class DispatcherSynchronizationContext : SynchronizationContext{ public override void Post(SendOrPostCallback d, Object state) => _dispatcher.BeginInvoke(_priority, d, state); ...}
ASP.NET 曾經(jīng)有一個(gè),它實(shí)際上并不關(guān)心工作在什么線程上運(yùn)行,而是關(guān)心給定的請(qǐng)求相關(guān)的工作被序列化,這樣多個(gè)線程就不會(huì)并發(fā)地訪問(wèn)給定的 HttpContext:
internal sealed class AspNetSynchronizationContext : AspNetSynchronizationContextBase{ public override void Post(SendOrPostCallback callback, Object state) => _state.Helper.QueueAsynchronous(() => callback(state)); ...}
這也不限于這些主要的應(yīng)用程序模型。例如,xunit 是一個(gè)流行的單元測(cè)試框架,是 .NET 核心存儲(chǔ)庫(kù)用于單元測(cè)試的框架,它也采用了多個(gè)自定義的 SynchronizationContext。例如,你可以允許并行運(yùn)行測(cè)試,但限制允許并發(fā)運(yùn)行的測(cè)試數(shù)量。這是如何實(shí)現(xiàn)的呢?通過(guò) SynchronizationContext:
public class MaxConcurrencySyncContext : SynchronizationContext, IDisposable{ public override void Post(SendOrPostCallback d, object? state) { var context = ExecutionContext.Capture(); workQueue.Enqueue((d, state, context)); workReady.Set(); }}
MaxConcurrencySyncContext 的 Post 方法只是將工作排到自己的內(nèi)部工作隊(duì)列中,然后在它自己的工作線程上處理它,它根據(jù)所需的最大并發(fā)數(shù)來(lái)控制有多少工作線程。
這與基于事件的異步模式有什么聯(lián)系?EAP 和 SynchronizationContext 是同時(shí)引入的,當(dāng)異步操作被啟動(dòng)時(shí),EAP 規(guī)定完成事件應(yīng)該排隊(duì)到當(dāng)前任何 SynchronizationContext 中。為了稍微簡(jiǎn)化一下,System.ComponentModel 中也引入了一些輔助類型,尤其是 AsyncOperation 和 AsyncOperationManager。前者只是一個(gè)元組,封裝了用戶提供的狀態(tài)對(duì)象和捕獲的 SynchronizationContext,而后者只是作為一個(gè)簡(jiǎn)單的工廠來(lái)捕獲并創(chuàng)建 AsyncOperation 實(shí)例。然后 EAP 實(shí)現(xiàn)將使用這些,例如 Ping.SendAsync 調(diào)用 AsyncOperationManager.CreateOperation 來(lái)捕獲 SynchronizationContext。當(dāng)操作完成時(shí),AsyncOperation 的 PostOperationCompleted 方法將被調(diào)用,以調(diào)用存儲(chǔ)的 SynchronizationContext 的 Post 方法。
我們需要比 APM 模式更好的東西,接下來(lái)出現(xiàn)的 EAP 引入了一些新的事務(wù),但并沒(méi)有真正解決我們面臨的核心問(wèn)題。我們?nèi)匀恍枰玫臇|西。
輸入任務(wù)
.NET Framework 4.0 引入了 System.Threading.Tasks.Task 類型。從本質(zhì)上講,Task 只是一個(gè)數(shù)據(jù)結(jié)構(gòu),表示某些異步操作的最終完成(其他框架將類似的類型稱為“promise”或“future”)。創(chuàng)建 Task 是為了表示某些操作,然后當(dāng)它表示的操作邏輯上完成時(shí),結(jié)果存儲(chǔ)到該 Task 中。但是 Task 提供的關(guān)鍵特性使它比 IAsyncResult 更有用,它在自己內(nèi)部?jī)?nèi)置了 continuation 的概念。這一特性意味著您可以訪問(wèn)任何 Task,并在其完成時(shí)請(qǐng)求異步通知,由任務(wù)本身處理同步,以確保繼續(xù)被調(diào)用,無(wú)論任務(wù)是否已經(jīng)完成、尚未完成、還是與通知請(qǐng)求同時(shí)完成。為什么會(huì)有如此大的影響?如果你還記得我們對(duì)舊 APM 模式的討論,有兩個(gè)主要問(wèn)題。
- 你必須為每個(gè)操作實(shí)現(xiàn)一個(gè)自定義的 IAsyncResult 實(shí)現(xiàn):沒(méi)有內(nèi)置的 IAsyncResult 實(shí)現(xiàn),任何人都可以根據(jù)需要使用。
- 在 Begin 方法被調(diào)用之前,你必須知道當(dāng)它完成時(shí)要做什么。這使得實(shí)現(xiàn)組合器和其他用于消耗和組合任意異步實(shí)現(xiàn)的通用例程成為一個(gè)重大挑戰(zhàn)。
現(xiàn)在,讓我們更好地理解它的實(shí)際含義。我們先從幾個(gè)字段開(kāi)始:
class MyTask{ private bool _completed; private Exception? _error; private Action<MyTask>? _continuation; private ExecutionContext? _ec; ...}
我們需要一個(gè)字段來(lái)知道任務(wù)是否完成(_completed),還需要一個(gè)字段來(lái)存儲(chǔ)導(dǎo)致任務(wù)失敗的任何錯(cuò)誤(_error);如果我們還要實(shí)現(xiàn)一個(gè)通用的 MyTask<TResult>,那么也會(huì)有一個(gè)私有的 TResult _result 字段,用于存儲(chǔ)操作的成功結(jié)果。到目前為止,這看起來(lái)很像我們之前自定義的 IAsyncResult 實(shí)現(xiàn)(當(dāng)然,這不是巧合)。但是現(xiàn)在最重要的部分,是 _continuation 字段。在這個(gè)簡(jiǎn)單的實(shí)現(xiàn)中,我們只支持一個(gè) continuation,但對(duì)于解釋目的來(lái)說(shuō)這已經(jīng)足夠了(真正的任務(wù)使用了一個(gè)對(duì)象字段,該字段可以是單個(gè) continuation 對(duì)象,也可以是 continuation 對(duì)象的 List<>)。這是一個(gè)委托,將在任務(wù)完成時(shí)調(diào)用。
如前所述,與以前的模型相比,Task 的一個(gè)基本進(jìn)步是能夠在操作開(kāi)始后提供延續(xù)工作(回調(diào))。我們需要一個(gè)方法來(lái)做到這一點(diǎn),所以讓我們添加 ContinueWith:
public void ContinueWith(Action<MyTask> action){ lock (this) { if (_completed) { ThreadPool.QueueUserWorkItem(_ => action(this)); } else if (_continuation is not null) { throw new InvalidOperationException("Unlike Task, this implementation only supports a single continuation."); } else { _continuation = action; _ec = ExecutionContext.Capture(); } }}
如果任務(wù)在 ContinueWith 被調(diào)用時(shí)已經(jīng)被標(biāo)記為完成,ContinueWith 只是排隊(duì)執(zhí)行委托。否則,該方法將存儲(chǔ)該委托,以便在任務(wù)完成時(shí)可以排隊(duì)繼續(xù)執(zhí)行(它還存儲(chǔ)了一個(gè)叫做 ExecutionContext 的東西,然后在以后調(diào)用該委托時(shí)使用它)。
然后,我們需要能夠?qū)?MyTask 標(biāo)記為完成,這意味著它所代表的異步操作已經(jīng)完成。為此,我們將提供兩個(gè)方法,一個(gè)用于標(biāo)記完成(" SetResult "),另一個(gè)用于標(biāo)記完成并返回錯(cuò)誤(" SetException "):
public void SetResult() => Complete(null); public void SetException(Exception error) => Complete(error); private void Complete(Exception? error){ lock (this) { if (_completed) { throw new InvalidOperationException("Already completed"); } _error = error; _completed = true; if (_continuation is not null) { ThreadPool.QueueUserWorkItem(_ => { if (_ec is not null) { ExecutionContext.Run(_ec, _ => _continuation(this), null); } else { _continuation(this); } }); } }}
我們存儲(chǔ)任何錯(cuò)誤,將任務(wù)標(biāo)記為已完成,然后如果之前已經(jīng)注冊(cè)了 continuation,則將其排隊(duì)等待調(diào)用。
最后,我們需要一種方法來(lái)傳播任務(wù)中可能發(fā)生的任何異常(并且,如果這是一個(gè)泛型 MyTask<T>,則返回其_result);為了方便某些情況,我們還允許此方法阻塞等待任務(wù)完成,這可以通過(guò) ContinueWith 實(shí)現(xiàn)(continuation 只是發(fā)出 ManualResetEventSlim 信號(hào),然后調(diào)用者阻塞等待完成)。
public void Wait(){ ManualResetEventSlim? mres = null; lock (this) { if (!_completed) { mres = new ManualResetEventSlim(); ContinueWith(_ => mres.Set()); } } mres?.Wait(); if (_error is not null) { ExceptionDispatchInfo.Throw(_error); }}
基本上就是這樣?,F(xiàn)在可以肯定的是,真正的 Task 要復(fù)雜得多,有更高效的實(shí)現(xiàn),支持任意數(shù)量的 continuation,有大量關(guān)于它應(yīng)該如何表現(xiàn)的按鈕(例如,continuation 應(yīng)該像這里所做的那樣排隊(duì),還是應(yīng)該作為任務(wù)完成的一部分同步調(diào)用),能夠存儲(chǔ)多個(gè)異常而不是一個(gè)異常,具有取消的特殊知識(shí),有大量的輔助方法用于執(zhí)行常見(jiàn)操作,例如 Task.Run,它創(chuàng)建一個(gè) Task 來(lái)表示線程池上調(diào)用的委托隊(duì)列等等。
你可能還注意到,我簡(jiǎn)單的 MyTask 直接有公共的 SetResult/SetException 方法,而 Task 沒(méi)有。實(shí)際上,Task 確實(shí)有這樣的方法,它們只是內(nèi)部的,System.Threading.Tasks.TaskCompletionSource 類型作為任務(wù)及其完成的獨(dú)立“生產(chǎn)者”;這樣做不是出于技術(shù)上的需要,而是為了讓完成方法遠(yuǎn)離只用于消費(fèi)的東西。然后,你就可以把 Task 分發(fā)出去,而不必?fù)?dān)心它會(huì)在你下面完成;完成信號(hào)是創(chuàng)建任務(wù)的實(shí)現(xiàn)細(xì)節(jié),并且通過(guò)保留 TaskCompletionSource 本身來(lái)保留完成它的權(quán)利。(CancellationToken 和 CancellationTokenSource 遵循類似的模式:CancellationToken 只是 CancellationTokenSource 的一個(gè)結(jié)構(gòu)封裝器,只提供與消費(fèi)取消信號(hào)相關(guān)的公共區(qū)域,但沒(méi)有產(chǎn)生取消信號(hào)的能力,而產(chǎn)生取消信號(hào)的能力僅限于能夠訪問(wèn) CancellationTokenSource的人。)
當(dāng)然,我們可以為這個(gè) MyTask 實(shí)現(xiàn)組合器和輔助器,就像 Task 提供的那樣。
想要一個(gè)簡(jiǎn)單的 MyTask.WhenAll?
public static MyTask WhenAll(MyTask t1, MyTask t2){ var t = new MyTask(); int remaining = 2; Exception? e = null; Action<MyTask> continuation = completed => { e ??= completed._error; // just store a single exception for simplicity if (Interlocked.Decrement(ref remaining) == 0) { if (e is not null) t.SetException(e); else t.SetResult(); } }; t1.ContinueWith(continuation); t2.ContinueWith(continuation); return t;}
想要一個(gè) MyTask.Run?你得到了它:
public static MyTask Run(Action action){ var t = new MyTask(); ThreadPool.QueueUserWorkItem(_ => { try { action(); t.SetResult(); } catch (Exception e) { t.SetException(e); } }); return t;}
一個(gè) MyTask.Delay 怎么樣?當(dāng)然可以:
public static MyTask Delay(TimeSpan delay){ var t = new MyTask(); var timer = new Timer(_ => t.SetResult()); timer.Change(delay, Timeout.InfiniteTimeSpan); return t;}
有了 Task,.NET 中之前的所有異步模式都將成為過(guò)去。在以前使用 APM 模式或 EAP 模式實(shí)現(xiàn)異步實(shí)現(xiàn)的地方,都會(huì)公開(kāi)新的 Task 返回方法。
▌ValueTasks
時(shí)至今日,Task 仍然是 .NET 中異步處理的主力,每次發(fā)布都有新方法公開(kāi),并且在整個(gè)生態(tài)系統(tǒng)中都例行地返回 Task 和 Task<TResult>。然而,Task 是一個(gè)類,這意味著創(chuàng)建一個(gè)類需要分配內(nèi)存。在大多數(shù)情況下,為一個(gè)長(zhǎng)期異步操作額外分配內(nèi)存是微不足道的,除了對(duì)性能最敏感的操作之外,不會(huì)對(duì)所有操作的性能產(chǎn)生有意義的影響。不過(guò),如前所述,異步操作的同步完成是相當(dāng)常見(jiàn)的。引入 Stream.ReadAsync 是為了返回一個(gè) Task<int>,但如果你從一個(gè) BufferedStream 中讀取數(shù)據(jù),很有可能很多讀取都是同步完成的,因?yàn)橹恍枰獜膬?nèi)存中的緩沖區(qū)中讀取數(shù)據(jù),而不是執(zhí)行系統(tǒng)調(diào)用和真正的 I/O 操作。不得不分配一個(gè)額外的對(duì)象來(lái)返回這樣的數(shù)據(jù)是不幸的(注意,APM 也是這樣的情況)。對(duì)于返回 Task 的非泛型方法,該方法可以只返回一個(gè)已經(jīng)完成的單例任務(wù),而實(shí)際上 Task.CompletedTask 提供了一個(gè)這樣的單例 Task。但對(duì)于 Task<TResult> 來(lái)說(shuō),不可能為每個(gè)可能的結(jié)果緩存一個(gè) Task。我們可以做些什么來(lái)讓這種同步完成更快呢?
緩存一些 Task<TResult> 是可能的。例如,Task<bool> 非常常見(jiàn),而且只有兩個(gè)有意義的東西需要緩存:當(dāng)結(jié)果為 true 時(shí),一個(gè) Task<bool>,當(dāng)結(jié)果為 false 時(shí),一個(gè) Task<bool>。或者,雖然我們不想緩存40億個(gè) Task<int> 來(lái)容納所有可能的 Int32 結(jié)果,但小的 Int32 值是非常常見(jiàn)的,因此我們可以緩存一些值,比如-1到8?;蛘邔?duì)于任意類型,default 是一個(gè)合理的通用值,因此我們可以緩存每個(gè)相關(guān)類型的 Task<TResult>,其中 Result 為 default(TResult)。事實(shí)上,Task.FromResult 今天也是這樣做的 (從最近的 .NET 版本開(kāi)始),使用一個(gè)小型的可復(fù)用的 Task<TResult> 單例緩存,并在適當(dāng)時(shí)返回其中一個(gè),或者為準(zhǔn)確提供的結(jié)果值分配一個(gè)新的 Task<TResult>。可以創(chuàng)建其他方案來(lái)處理其他合理的常見(jiàn)情況。例如,當(dāng)使用 Stream.ReadAsync 時(shí),在同一個(gè)流上多次調(diào)用它是合理的,而且每次調(diào)用時(shí)允許讀取的字節(jié)數(shù)都是相同的。實(shí)現(xiàn)能夠完全滿足 count 請(qǐng)求是合理的。這意味著 Stream.ReadAsync 重復(fù)返回相同的 int 值是很常見(jiàn)的。為了避免這種情況下的多次分配,多個(gè) Stream 類型(如 MemoryStream)會(huì)緩存它們最后成功返回的 Task<int>,如果下一次讀取也同步完成并成功獲得相同的結(jié)果,它可以只是再次返回相同的 Task<int>,而不是創(chuàng)建一個(gè)新的。但其他情況呢?在性能開(kāi)銷非常重要的情況下,如何更普遍地避免對(duì)同步完成的這種分配?
這就是 ValueTask<TResult> 的作用。ValueTask<TResult> 最初是作為 TResult 和 Task<TResult> 之間的一個(gè)區(qū)分并集。說(shuō)到底,拋開(kāi)那些花哨的東西,這就是它的全部 (或者,更確切地說(shuō),曾經(jīng)是),是一個(gè)即時(shí)的結(jié)果,或者是對(duì)未來(lái)某個(gè)時(shí)刻的一個(gè)結(jié)果的承諾:
public readonly struct ValueTask<TResult>{ private readonly Task<TResult>? _task; private readonly TResult _result; ...}
然后,一個(gè)方法可以返回這樣一個(gè) ValueTask<TResult>,而不是一個(gè) Task<TResult>,如果 TResult 在需要返回的時(shí)候已經(jīng)知道了,那么就可以避免 Task<TResult> 的分配,代價(jià)是一個(gè)更大的返回類型和稍微多一點(diǎn)間接性。
然而,在一些超級(jí)極端的高性能場(chǎng)景中,即使在異步完成的情況下,您也希望能夠避免 Task<TResult> 分配。例如,Socket 位于網(wǎng)絡(luò)堆棧的底部,Socket 上的 SendAsync 和 ReceiveAsync 對(duì)于許多服務(wù)來(lái)說(shuō)是非常熱門的路徑,同步和異步完成都非常常見(jiàn)(大多數(shù)同步發(fā)送完成,許多同步接收完成,因?yàn)閿?shù)據(jù)已經(jīng)在內(nèi)核中緩沖了)。如果在一個(gè)給定的 Socket 上,我們可以使這樣的發(fā)送和接收不受分配限制,而不管操作是同步完成還是異步完成,這不是很好嗎?
這就是 System.Threading.Tasks.Sources.IValueTaskSource<TResult> 進(jìn)入的地方:
public interface IValueTaskSource<out TResult>{ ValueTaskSourceStatus GetStatus(short token); void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags); TResult GetResult(short token);}
IValueTaskSource<TResult> 接口允許一個(gè)實(shí)現(xiàn)為 ValueTask<TResult> 提供自己的支持對(duì)象,使該對(duì)象能夠?qū)崿F(xiàn)像 GetResult 這樣的方法來(lái)檢索操作的結(jié)果,以及 OnCompleted 來(lái)連接操作的延續(xù)。就這樣,ValueTask<TResult> 對(duì)其定義進(jìn)行了一個(gè)小小的更改,其 Task<TResult>? _task 字段替換為 object? _obj 字段:
public readonly struct ValueTask<TResult>{ private readonly object? _obj; private readonly TResult _result; ...}
以前 _task 字段要么是 Task<TResult> 要么是 null,現(xiàn)在 _obj 字段也可以是 IValueTaskSource<TResult>。一旦 Task<TResult> 被標(biāo)記為已完成,它將保持完成狀態(tài),并且永遠(yuǎn)不會(huì)轉(zhuǎn)換回未完成的狀態(tài)。相比之下,實(shí)現(xiàn) IValueTaskSource<TResult> 的對(duì)象對(duì)實(shí)現(xiàn)有完全的控制權(quán),可以自由地在完成狀態(tài)和不完成狀態(tài)之間雙向轉(zhuǎn)換,因?yàn)?ValueTask<TResult> 的契約是一個(gè)給定的實(shí)例只能被消耗一次,因此從結(jié)構(gòu)上看,它不應(yīng)該觀察到底層實(shí)例的消耗后變化(這就是 CA2012等分析規(guī)則存在的原因)。這就使得像 Socket 這樣的類型能夠?qū)?IValueTaskSource<TResult> 的實(shí)例集中起來(lái),用于重復(fù)調(diào)用。Socket 最多可以緩存兩個(gè)這樣的實(shí)例,一個(gè)用于讀,一個(gè)用于寫,因?yàn)?9.999%的情況是在同一時(shí)間最多只有一個(gè)接收和一個(gè)發(fā)送。
我提到了 ValueTask<TResult>,但沒(méi)有提到 ValueTask。當(dāng)只處理避免同步完成的分配時(shí),使用非泛型 ValueTask(代表無(wú)結(jié)果的無(wú)效操作)在性能上沒(méi)有什么好處,因?yàn)橥瑯拥臈l件可以用 Task.CompletedTask 來(lái)表示。但是,一旦我們關(guān)心在異步完成的情況下使用可池化的底層對(duì)象來(lái)避免分配的能力,那么這對(duì)非泛型也很重要。因此,當(dāng) IValueTaskSource<TResult> 被引入時(shí),IValueTaskSource 和 ValueTask 也被引入。
因此,我們有 Task、Task<TResult>、ValueTask 和 ValueTask<TResult>。我們能夠以各種方式與它們交互,表示任意的異步操作,并連接 continuation 來(lái)處理這些異步操作的完成。
下期文章,我們將繼續(xù)介紹 C# 迭代器,歡迎持續(xù)關(guān)注。
以上就是C#語(yǔ)言async await工作原理示例解析的詳細(xì)內(nèi)容,更多關(guān)于C# async await工作原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決unity rotate旋轉(zhuǎn)物體 限制物體旋轉(zhuǎn)角度的大坑
這篇文章主要介紹了解決unity rotate旋轉(zhuǎn)物體 限制物體旋轉(zhuǎn)角度的大坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-04-04C#實(shí)現(xiàn)將像素轉(zhuǎn)換為頁(yè)面單位的方法
這篇文章主要介紹了C#實(shí)現(xiàn)將像素轉(zhuǎn)換為頁(yè)面單位的方法,涉及C#像素轉(zhuǎn)換在圖形繪制中的技巧,需要的朋友可以參考下2015-06-06C#的FileSystemWatcher用法實(shí)例詳解
這篇文章主要介紹了C#的FileSystemWatcher用法,以實(shí)例形似詳細(xì)分析了FileSystemWatcher控件主要功能,并總結(jié)了FileSystemWatcher控件使用的技巧,需要的朋友可以參考下2014-11-11C#在Entity Framework中實(shí)現(xiàn)事務(wù)回滾
這篇文章介紹了C#在Entity Framework中實(shí)現(xiàn)事務(wù)回滾的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-08-08C#采用Winform實(shí)現(xiàn)類似Android的Listener
這篇文章主要介紹了C#采用Winform實(shí)現(xiàn)類似Android的Listener,很實(shí)用的技巧,需要的朋友可以參考下2014-08-08