AspNetCore&MassTransit?Courier實現(xiàn)分布式事務(wù)的詳細過程
在之前的一篇博文中,CAP框架可以方便我們實現(xiàn)非實時、異步場景下的最終一致性,而有些用例總是無法避免的需要在實時、同步場景下進行,可以借助Saga事務(wù)來解決這一困擾。在一些博文和倉庫中也搜尋到了.Net下實現(xiàn)Saga模式的解決方案MassTransit,這就省得自己再造輪子了。
分布式事務(wù)
分布式系統(tǒng)中,分布式事務(wù)是一個不能避免的問題,如何保證不同節(jié)點間的數(shù)據(jù)一致性。舉個常見的例子,下訂單、減庫存、扣余額,三者在單個節(jié)點時,可以借助本地事務(wù),實現(xiàn)要么成功要么失敗。而當三者處于不同節(jié)點時,又參雜了如網(wǎng)絡(luò)環(huán)境、節(jié)點自身環(huán)境、服務(wù)環(huán)境等各種因素,使得三個節(jié)點想要實現(xiàn)要么成功、要么失敗就增加了許多困難。
CAP理論和BASE理論很好的詮釋了這一問題,也有了許多的解決分布式事務(wù)的方案,如2PC、3PC、TCC、本地消息表、Saga等一系列解決方案,面對不同場景、不同要求等可選擇不同的解決方案。
數(shù)據(jù)一致性 | 容錯性 | 復(fù)雜性 | 性能 | 維護成本 | |
---|---|---|---|---|---|
2PC | 強 | 低 | 中 | 低 | 低 |
3PC | 強 | 低 | 高 | 低 | 中 |
TCC | 弱 | 高 | 高 | 中 | 高 |
本地消息表 | 弱 | 高 | 低 | 中 | 中 |
MQ事務(wù) | 弱 | 高 | 低 | 高 | 中 |
Saga事務(wù) | 弱 | 高 | 高 | 中 | 高 |
在之前提到過一個基于本地消息表的CAP框架,借助最終一致性很方便的解決了異步非實時請求下的分布式事務(wù),而對于大部分場景雖然可以直接或者妥協(xié)方式使用著異步非實時,如同步實時場景的下訂單且減庫存變更到異步非實時場景的下訂單后發(fā)事件減庫存,但是總有那么一些場景,不得不去考慮同步實時請求下的分布式事務(wù)。
Saga模式
Saga模式又叫做長時間運行事務(wù)(Long-running-transaction), 由普林斯頓大學(xué)的 Hector Garcia-Molina和Kenneth Salem 1987年發(fā)表的論文《Sagas》。核心思想是將長事務(wù)拆分為多個本地短事務(wù),通過保證所有短事務(wù)的成功或失敗來決定整體的成功或失敗,由Saga事務(wù)協(xié)調(diào)器協(xié)調(diào)管理,所有節(jié)點執(zhí)行成功,則成功,如有節(jié)點失敗,則反向執(zhí)行前置節(jié)點的補償操作。
- 每個Saga事務(wù)由一系列冪等的有序子事務(wù)(sub-transaction) Ti 組成。
- 每個Ti 都有對應(yīng)的冪等補償動作Ci,補償動作用于撤銷Ti造成的結(jié)果。
執(zhí)行過程
當正常執(zhí)行時,依照T1、T2、T3三個短事務(wù)正常執(zhí)行下去,直到最后一個Tn事務(wù)執(zhí)行完畢,宣告整個事務(wù)的成功。
而當執(zhí)行到某個Tj出現(xiàn)故障時,則反向補償之前的Tj-1..T1,每個對應(yīng)的補償操作Cj-1...C1,其中Tj事務(wù)由于在執(zhí)行階段就已失敗,所以Tj對應(yīng)的補償動作Cj不需要執(zhí)行,即也確定了最后一個Tn事務(wù)可以不設(shè)置補償動作Cn。
恢復(fù)策略
- 向前恢復(fù)(forward recovery):對于Ti事務(wù)的執(zhí)行,部分場景下可能因為數(shù)據(jù)庫的連接、網(wǎng)絡(luò)的波動等導(dǎo)致短暫的失敗,對Ti事務(wù)重試執(zhí)行,以確保整個事務(wù)的執(zhí)行,如執(zhí)行T1, T2, T3,當執(zhí)行T3失敗時,不直接宣告失敗,對T3執(zhí)行重試以排除部分不穩(wěn)定因素,如在若干次重試無效后,再考慮向后恢復(fù)。
- 向后恢復(fù)(backward recovery):按照執(zhí)行順序方式作為向前的指向,則向后為反向補償,對已執(zhí)行過的節(jié)點順序倒退執(zhí)行各Ti的補償動作Ci,也就是把走過的路往回走,對執(zhí)行過的操作執(zhí)行業(yè)務(wù)上的反操作,如正向流程執(zhí)行減庫存則補償操作時執(zhí)行加庫存。
協(xié)作方式
對于服務(wù)與服務(wù)間的協(xié)作,我們通常有兩種模式:Orchestration(編排式) 和 Choreography(協(xié)同式),在Saga模式中也有著這兩種的實現(xiàn)。
- 編排式(Orchestrator):把 Saga 的決策和執(zhí)行順序邏輯集中在一個 Saga 編排器類中。Saga 編排器發(fā)出命令式消息給各個 Saga 參與方,指示這些參與方服務(wù)完成具體操作(本地事務(wù))。
- 協(xié)同式(Choreography):把 Saga 的決策和執(zhí)行順序邏輯分布在 Saga 的每個參與方中,它們通過交換事件的方式來進行溝通。
編排式與協(xié)同式的差異僅在于服務(wù)之間的協(xié)作方式,每個參與服務(wù)的接口定義卻沒有任何區(qū)別。
編排式(Orchestrator)
編排式的 Saga 需要開發(fā)人員定義一個編排器類,用于編排一個Saga中多個參與服務(wù)執(zhí)行的流程。如果整個業(yè)務(wù)流程正常結(jié)束,業(yè)務(wù)就成功完成,一旦這個過程的任何環(huán)節(jié)出現(xiàn)失敗,Saga編排器類就會以相反的順序調(diào)用補償操作,重新進行業(yè)務(wù)回滾。
對于每個參與的服務(wù)而言,需要做的事情是
- 訂閱并處理命令消息
- 執(zhí)行命令后返回響應(yīng)消息
- 設(shè)計執(zhí)行邏輯和補償邏輯
以提交訂單為例,假設(shè)場景是分布式系統(tǒng)下,進程間以消息傳遞進行通信:
1、事務(wù)發(fā)起方的主業(yè)務(wù)邏輯請求預(yù)先定義好的Saga編排器類(內(nèi)部編排了執(zhí)行順序)。
2、Saga編排器類向MQ發(fā)送減庫存事件,庫存服務(wù)訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。
3、Saga編排器類向MQ發(fā)送減余額事件,支付服務(wù)訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。
4、Saga編排器類向MQ發(fā)送創(chuàng)建訂單命令,訂單服務(wù)訂閱事件并按照命令創(chuàng)建訂單。
5、主業(yè)務(wù)邏輯接收并處理Saga編排器類處理結(jié)果。
6、整個過程由Saga 編排器類對接收到的回復(fù)進行判決,來決定是繼續(xù)執(zhí)行還是懸崖勒馬。
協(xié)同式(Choreography)
沒有集中式的編排類,而是各參與方間相互訂閱,一個服務(wù)訂閱另一個服務(wù)的事件。
先由事務(wù)發(fā)起方執(zhí)行邏輯并發(fā)布一個事件,該事件被一個或多個服務(wù)進行訂閱,這些服務(wù)執(zhí)行本地數(shù)據(jù)庫操作并發(fā)布(或不發(fā)布)新的事件,該部分需要保證本地數(shù)據(jù)庫的操作成功且寫入MQ的消息也成功,可考慮使用本地消息表或是基于MQ事務(wù)。當最后一個服務(wù)執(zhí)行本地事務(wù)并且不發(fā)布任何事件或者發(fā)布的事件沒有被任何Saga參與者訂閱意味著事務(wù)結(jié)束,則整個業(yè)務(wù)流程的分布式事務(wù)完成。如果某一服務(wù)出現(xiàn)故障,那么則反向發(fā)布事件,執(zhí)行補償操作,以此回滾。
以提交訂單為例,假設(shè)場景是分布式系統(tǒng)下,進程間以消息傳遞進行通信:
1、事務(wù)發(fā)起方執(zhí)行主業(yè)務(wù)邏輯發(fā)送提交訂單命令。
2、庫存服務(wù)訂閱事件、扣減庫存并發(fā)布已扣減事件。
3、訂單服務(wù)訂閱庫存已扣減事件,創(chuàng)建訂單并發(fā)布訂單已創(chuàng)建事件。
4、支付服務(wù)訂閱訂單已創(chuàng)建事件,執(zhí)行支付并發(fā)布訂單已支付事件。
5、主業(yè)務(wù)邏輯訂閱訂單已支付事件并處理。
當某服務(wù)內(nèi)執(zhí)行時如存在異常,則反向發(fā)布事件,如訂單創(chuàng)建失敗,則發(fā)布OrderCreatedFailed事件,庫存服務(wù)訂閱該事件并執(zhí)行補償操作。
相比而言,編排式中參與服務(wù)無需向協(xié)同式中訂閱上游服務(wù)的事件,減少了服務(wù)間對事件協(xié)議的依賴,而只需要關(guān)心集權(quán)的編排器類發(fā)送的消息。
MassTransit Courier
MassTransit Courier是一種用于創(chuàng)建和執(zhí)行帶有故障補償?shù)姆植际绞聞?wù)的機制,它可以用于滿足本地事務(wù)的需求,也可以在分布式系統(tǒng)中實現(xiàn)分布式事務(wù)。
Courier實現(xiàn)了Routing Slip模式,通過有序組合一系列的Activity,得到一個Routing slip。每個Activity都有 Execute 和 Compensate 兩個方法(最后一個可以只有一個Execute方法)。Compensate即為補償操作。
補償服務(wù)
當開啟一個事務(wù)前,需要做一些準備,準備一個事務(wù)Id,記錄整個事務(wù)執(zhí)行情況,各Tj事務(wù)執(zhí)行情況,當前請求上下文參數(shù),入?yún)?shù)記錄等,以方便執(zhí)行補償操作時需要用到。如當Tj事務(wù)執(zhí)行失敗時,需要對Cj-1到C1執(zhí)行補償操作,此時各補償操作需要一些正向執(zhí)行T1,Tj-1的請求參數(shù)或執(zhí)行結(jié)果,因此都需要記錄下來。
在Courier中,通過Routing Slip來完成這些記錄,創(chuàng)建一個Guid,記錄請求上下文參數(shù)信息,可以綁定幾個內(nèi)置事件,在各階段到來時會發(fā)送事件,如有需要可以訂閱。
var builder = new RoutingSlipBuilder(NewId.NextGuid()); builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed); builder.AddVariable("RequestId", context.RequestId); builder.AddVariable("ResponseAddress", context.ResponseAddress); builder.AddVariable("FaultAddress", context.FaultAddress); builder.AddVariable("Request", context.Message); //組合一系列Activity var routingSlip = builder.Build(); await context.Execute(routingSlip).ConfigureAwait(false);
服務(wù)建立
弄了個Demo,建立了三個服務(wù),此處我使用編排式來完成,但無論是選用編排式還是協(xié)同式,都借助RabbitMQ實現(xiàn)消息傳遞。
每個服務(wù)都安裝了MassTransit相關(guān)的包
MassTransit.AspNetCore MassTransit.RabbitMQ
將Saga編排器類放置在OrderService中了,對于編排器類的放置,個人認為是應(yīng)該看用例的主服務(wù)是誰而放置,想過放在BFF去協(xié)調(diào)三個服務(wù),但是總是感覺不是BFF的職責范圍。
服務(wù)配置
在各服務(wù)中對MassTransit配置,如下在OrderService中對MassTransit需要使用到的RabbitMQ配置,對需要進行多個服務(wù)協(xié)作的用例配置Routing Slip,對消息隊列偵聽訂閱需要的事件并配置相應(yīng)的Activity處理。
services.AddMassTransit(x => { var currentAssembly = Assembly.GetExecutingAssembly(); x.AddActivities(currentAssembly); x.AddConsumers(currentAssembly); x.AddRequestClient<createordercommand>(); x.UsingRabbitMq((context, cfg) => { // 配置RabbitMQ cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h => { h.Username(Configuration["RabbitmqConfig:Username"]); h.Password(Configuration["RabbitmqConfig:Password"]); }); //配置Routing Slip cfg.ReceiveEndpoint("CreateOrderCommand", ep => { ep.ConfigureConsumer<createorderrequestproxy>(context); ep.ConfigureConsumer<createorderresponseproxy>(context); }); // 配置訂閱隊列及Handler處理 cfg.ReceiveEndpoint("CreateOrder_execute", ep => { ep.ExecuteActivityHost<createorderactivity, createordermodel="">(context); }); }); }); services.AddMassTransitHostedService();
服務(wù)編排
構(gòu)建Routing Slip,此處依據(jù)用例的需求,對需要協(xié)作的服務(wù)編排,組合一系列的Activity。
Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<createordercommand> request) { builder.AddActivity("ReduceStock", new Uri("..."), new {}); builder.AddActivity("DeductBalance", new Uri("..."), new {}); builder.AddActivity("CreateOrder", new Uri("..."), new { }); return Task.CompletedTask; }
執(zhí)行請求
當請求進入后,通過RequestClient發(fā)送CreateOrderCommand,同步等待執(zhí)行結(jié)果,再由編排器類負責協(xié)調(diào)預(yù)設(shè)好的Activity,發(fā)送事件到消息隊列,經(jīng)各Activity訂閱處理最終返回結(jié)果。
[Route("[controller]")] public class OrderController : ControllerBase { private readonly IRequestClient<createordercommand> _createOrderClient; public OrderController(IRequestClient<createordercommand> createOrderClient) { _createOrderClient = createOrderClient; } [HttpGet("CreateOrder")] public async Task<commoncommandresponse<createorderresult>> CreateOrder() { var result = await _createOrderClient.GetResponse <commoncommandresponse<createorderresult>>(new CreateOrderCommand() { // ... }); return result.Message; } }
各服務(wù)中對于Activity設(shè)置偵聽隊列以及請求信息,調(diào)用Execute執(zhí)行邏輯,當出現(xiàn)異常時返回到MQ通知編排器類,在對之前執(zhí)行的Activity執(zhí)行Compensate。如在CreateOrderActivity中執(zhí)行異常,由編排器類執(zhí)行補償,ReduceStockActivity調(diào)用Compensate,執(zhí)行增加庫存邏輯
public class ReduceStockActivity : IActivity<ReduceStockModel, ReduceStockLog> { public async Task<ExecutionResult> Execute(ExecuteContext<ReduceStockModel> context) { var argument = context.Arguments; // 扣減庫存 await Task.Delay(100); return context.Completed(new ReduceStockLog() { ProductId = argument.ProductId, Amount = 1 }); } public async Task<CompensationResult> Compensate(CompensateContext<ReduceStockLog> context) { // 增加庫存 await Task.Delay(100); return context.Compensated(); } }
執(zhí)行成功
用例請求執(zhí)行后,先由Controller發(fā)送請求,再由庫存服務(wù)扣減庫存,支付服務(wù)扣減余額,最后由訂單服務(wù)創(chuàng)建訂單,當創(chuàng)建失敗時,執(zhí)行補償操作,庫存服務(wù)增加庫存,支付服務(wù)增加余額。
執(zhí)行補償
用例請求執(zhí)行后,先由Controller發(fā)送請求,再由庫存服務(wù)扣減庫存,支付服務(wù)扣減余額,最后由訂單服務(wù)創(chuàng)建訂單,當創(chuàng)建失敗時,執(zhí)行補償操作,庫存服務(wù)增加庫存,支付服務(wù)增加余額。
在整個事務(wù)失敗后,先會返回異常,再由編排器執(zhí)行補償操作,實現(xiàn)最終的數(shù)據(jù)一致性。MassTransit也提供了重試機制以實現(xiàn)向前恢復(fù),避免因數(shù)據(jù)庫連接超時、網(wǎng)絡(luò)波動等問題造成的失敗。
參考文獻
Masstransit中的 Request/Response 與 Courier 功能實現(xiàn)最終一致性 - 丁松松松
到此這篇關(guān)于AspNetCore&MassTransit Courier實現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)AspNetCore分布式事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MVC使用T4模板生成其他類的具體實現(xiàn)學(xué)習(xí)筆記2
這篇文章主要為大家詳細介紹了MVC使用T4模板生成其他類的具體實現(xiàn),具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-09-09Asp.Net?Core?使用Monaco?Editor?實現(xiàn)代碼編輯器功能
在項目中經(jīng)常有代碼在線編輯的需求,比如修改基于Xml的配置文件,編輯Json格式的測試數(shù)據(jù)等。這篇文章主要介紹了Asp.Net?Core?使用Monaco?Editor?實現(xiàn)代碼編輯器功能,需要的朋友可以參考下2022-01-01.NET Core 2.0遷移小技巧之web.config 配置文件示例詳解
這篇文章主要給大家介紹了關(guān)于.NET Core 2.0遷移技巧之web.config 配置文件的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-08-08ASP.NET Core中實現(xiàn)全局異常攔截的完整步驟
這篇文章主要給大家介紹了關(guān)于ASP.NET Core中如何實現(xiàn)全局異常攔截的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01微軟官方SqlHelper類 數(shù)據(jù)庫輔助操作類
本文主要介紹微軟官方的數(shù)據(jù)庫操作類極其使用方法,幫助大家利用已經(jīng)非常成熟的類庫來進行快速開發(fā)。2016-03-03