.NET中的MassTransit分布式應(yīng)用框架詳解
MassTransit是一款優(yōu)秀的分布式應(yīng)用框架,可作為分布式應(yīng)用的消息總線(xiàn),也可以用作單體應(yīng)用的事件總線(xiàn)。
引言

A free, open-source distributed application framework for .NET.
一個(gè)免費(fèi)、開(kāi)源的.NET 分布式應(yīng)用框架。-- MassTransit 官網(wǎng)
MassTransit,直譯公共交通, 是由Chris Patterson開(kāi)發(fā)的基于消息驅(qū)動(dòng)的.NET 分布式應(yīng)用框架,其核心思想是借助消息來(lái)實(shí)現(xiàn)服務(wù)之間的松耦合異步通信,進(jìn)而確保應(yīng)用更高的可用性、可靠性和可擴(kuò)展性。通過(guò)對(duì)消息模型的高度抽象,以及對(duì)主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大簡(jiǎn)化了基于消息驅(qū)動(dòng)的開(kāi)發(fā)門(mén)檻,同時(shí)內(nèi)置了連接管理、消息序列化和消費(fèi)者生命周期管理,以及諸如重試、限流、斷路器等異常處理機(jī)制,讓開(kāi)發(fā)者更好的專(zhuān)注于業(yè)務(wù)實(shí)現(xiàn)。
簡(jiǎn)而言之,MassTransit實(shí)現(xiàn)了消息代理透明化。無(wú)需面向消息代理編程進(jìn)行諸如連接管理、隊(duì)列的申明和綁定等操作,即可輕松實(shí)現(xiàn)應(yīng)用間消息的傳遞和消費(fèi)。
快速體驗(yàn)
空口無(wú)憑,創(chuàng)建一個(gè)項(xiàng)目快速體驗(yàn)一下。
- 基于
worker模板創(chuàng)建一個(gè)基礎(chǔ)項(xiàng)目:dotnet new worker -n MassTransit.Demo - 打開(kāi)項(xiàng)目,添加NuGet包:
MassTransit - 定義訂單創(chuàng)建事件消息契約:
using System;
namespace MassTransit.Demo
{
public record OrderCreatedEvent
{
public Guid OrderId { get; set; }
}
}4.修改Worker類(lèi),發(fā)送訂單創(chuàng)建事件:
namespace MassTransit.Demo;
public class Worker : BackgroundService
{
readonly IBus _bus;//注冊(cè)總線(xiàn)
public Worker(IBus bus)
{
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
//模擬并發(fā)送訂單創(chuàng)建事件
await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
}5.僅需實(shí)現(xiàn)IConsumer<OrderCreatedEvent>泛型接口,即可實(shí)現(xiàn)消息的訂閱:
public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedEventConsumer> _logger;
public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<OrderCreatedEvent> context)
{
_logger.LogInformation($"Received Order:{context.Message.OrderId}");
return Task.CompletedTask;
}
}6.注冊(cè)服務(wù):
using MassTransit;
using MassTransit.Demo;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService<Worker>();
services.AddMassTransit(configurator =>
{
//注冊(cè)消費(fèi)者
configurator.AddConsumer<OrderCreatedEventConsumer>();
//使用基于內(nèi)存的消息路由傳輸
configurator.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
})
.Build();
await host.RunAsync();7.運(yùn)行項(xiàng)目,一個(gè)簡(jiǎn)單的進(jìn)程內(nèi)事件發(fā)布訂閱的應(yīng)用就完成了。
如果需要使用RabbitMQ 消息代理進(jìn)行消息傳輸,則僅需安裝MassTransit.RabbitMQNuGet包,然后指定使用RabbitMQ 傳輸消息即可。
using MassTransit;
using MassTransit.Demo;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService<Worker>();
services.AddMassTransit(configurator =>
{
configurator.AddConsumer<OrderCreatedEventConsumer>();
// configurator.UsingInMemory((context, cfg) =>
// {
// cfg.ConfigureEndpoints(context);
// });
configurator.UsingRabbitMq((context, cfg) =>
{
cfg.Host(
host: "localhost",
port: 5672,
virtualHost: "/",
configure: hostConfig =>
{
hostConfig.Username("guest");
hostConfig.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
})
.Build();
await host.RunAsync();運(yùn)行項(xiàng)目,MassTransit會(huì)自動(dòng)在指定的RabbitMQ上創(chuàng)建一個(gè)類(lèi)型為fanout的MassTransit.Demo.OrderCreatedEventExchange和一個(gè)與OrderCreatedEvent同名的隊(duì)列進(jìn)行消息傳輸,如下圖所示。

核心概念
MassTranist 為了實(shí)現(xiàn)消息代理的透明化和應(yīng)用間消息的高效傳輸,抽象了以下概念,其中消息流轉(zhuǎn)流程如下圖所示:
- Message:消息契約,定義了消息生產(chǎn)者和消息消費(fèi)者之間的契約。
- Producer:生產(chǎn)者,發(fā)送消息的一方都可以稱(chēng)為生產(chǎn)者。
- SendEndpoint:發(fā)送端點(diǎn),用于將消息內(nèi)容序列化,并發(fā)送到傳輸模塊。
- Transport:傳輸模塊,消息代理透明化的核心,用于和消息代理通信,負(fù)責(zé)發(fā)送和接收消息。
- ReceiveEndpoint:接收端點(diǎn),用于從傳輸模塊接收消息,反序列化消息內(nèi)容,并將消息路由到消費(fèi)者。
- Consumer:消費(fèi)者,用于消息消費(fèi)。

從上圖可知,本質(zhì)上還是發(fā)布訂閱模式的實(shí)現(xiàn),接下來(lái)就核心概念進(jìn)行詳解。
Message
Message:消息,可以使用class、interface、struct和record來(lái)創(chuàng)建,消息作為一個(gè)契約,需確保創(chuàng)建后不能篡改,因此應(yīng)只保留只讀屬性且不應(yīng)包含方法和行為。MassTransit使用的是包含命名空間的完全限定名即typeof(T).FullName來(lái)表示特定的消息類(lèi)型。因此若在另外的項(xiàng)目中消費(fèi)同名的消息類(lèi)型,需確保消息的命名空間相同。另外需注意消息不應(yīng)繼承,以避免發(fā)送基類(lèi)消息類(lèi)型造成的不可預(yù)期的結(jié)果。為避免此類(lèi)情況,官方建議使用接口來(lái)定義消息。在MassTransit中,消息主要分為兩種類(lèi)型:
- Command:命令,用于告訴服務(wù)做什么,命令被發(fā)送到指定端點(diǎn),僅被一個(gè)服務(wù)接收并執(zhí)行。一般以動(dòng)名詞結(jié)構(gòu)命名,如:UpdateAddress、CancelOrder。
- Event:事件,用于告訴服務(wù)什么發(fā)生了,事件被發(fā)布到多個(gè)端點(diǎn),可以被多個(gè)服務(wù)消費(fèi)。 一般以過(guò)去式結(jié)構(gòu)命名,如:AddressUpdated,OrderCanceled。
經(jīng)過(guò)MassTransit發(fā)送的消息,會(huì)使用信封包裝,包含一些附加信息,數(shù)據(jù)結(jié)構(gòu)舉例如下:
{
"messageId": "6c600000-873b-00ff-9a8f-08da8da85542",
"requestId": null,
"correlationId": null,
"conversationId": "6c600000-873b-00ff-9526-08da8da85544",
"initiatorId": null,
"sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",
"destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",
"responseAddress": null,
"faultAddress": null,
"messageType": [
"urn:message:MassTransit.Demo:OrderCreatedEvent"
],
"message": {
"orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"
},
"expirationTime": null,
"sentTime": "2022-09-03T12:32:15.0796943Z",
"headers": {},
"host": {
"machineName": "THINKPAD",
"processName": "MassTransit.Demo",
"processId": 24684,
"assembly": "MassTransit.Demo",
"assemblyVersion": "1.0.0.0",
"frameworkVersion": "6.0.5",
"massTransitVersion": "8.0.6.0",
"operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"
}
}
從以上消息實(shí)例中可以看出一個(gè)包裝后的消息包含以下核心屬性:
- messageId:全局唯一的消息ID
- messageType:消息類(lèi)型
- message:消息體,也就是具體的消息實(shí)例
- sourceAddress:消息來(lái)源地址
- destinationAddress:消息目標(biāo)地址
- responseAddress:響應(yīng)地址,在請(qǐng)求響應(yīng)模式中使用
- faultAddress:消息異常發(fā)送地址,用于存儲(chǔ)異常消費(fèi)消息
- headers:消息頭,允許應(yīng)用自定義擴(kuò)展信息
- correlationId:關(guān)聯(lián)Id,在Saga狀態(tài)機(jī)中會(huì)用到,用來(lái)關(guān)聯(lián)系列事件
- host:宿主,消息來(lái)源應(yīng)用的宿主信息
Producer
Producer,生產(chǎn)者,即用于生產(chǎn)消息。在MassTransit主要借助以下對(duì)象進(jìn)行命令的發(fā)送和事件的發(fā)布。
從以上類(lèi)圖可以看出,消息的發(fā)送主要核心依賴(lài)于兩個(gè)接口:
ISendEndpoint:提供了Send方法,用于發(fā)送命令。IPublishEndpoint:提供了Publish方法,用于發(fā)布事件。
但基于上圖的繼承體系,可以看出通過(guò)IBus、ISendEndpointProvider和ConsumeContext進(jìn)行命令的發(fā)送;通過(guò)IBus和IPublishEndpointProvider進(jìn)行事件的發(fā)布。具體舉例如下:
發(fā)送命令
1.通過(guò)IBus發(fā)送:
private readonly IBus _bus;
public async Task Post(CreateOrderRequest request)
{
//通過(guò)以下方式配置對(duì)應(yīng)消息類(lèi)型的目標(biāo)地址
EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));
await _bus.Send(request);
}2.通過(guò)ISendEndpointProvider發(fā)送:
private readonly ISendEndpointProvider _sendEndpointProvider;
public async Task Post(CreateOrderRequest request)
{
var serviceAddress = new Uri("queue:create-order");
var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);
await endpoint.Send(request);
}3.通過(guò)ConsumeContext發(fā)送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>
{
public async Task Consume(ConsumeContext<CreateOrderRequest> context)
{
//do something else
var destinationAddress = new Uri("queue:lock-stock");
var command = new LockStockRequest(context.Message.OrderId);
await context.Send<LockStockRequest>(destinationAddress, command);
// 也可以通過(guò)獲取`SendEndpoint`發(fā)送命令
// var endpoint = await context.GetSendEndpoint(destinationAddress);
// await endpoint.Send<LockStockRequest>(command);
}
}發(fā)布事件
1.通過(guò)IBus發(fā)布:
private readonly IBus _bus;
public async Task Post(CreateOrderRequest request)
{
//do something
await _bus.Publish(request);
}2.通過(guò)IPublishEndpoint發(fā)布:
private readonly IPublishEndpoint _publishEndpoint;
public async Task Post(CreateOrderRequest request)
{
//do something
var order = CreateOrder(request);
await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
}3.通過(guò)ConsumeContext發(fā)布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>
{
public async Task Consume(ConsumeContext<CreateOrderRequest> context)
{
var order = CreateOrder(conext.Message);
await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
}
}Consumer
Consumer,消費(fèi)者,即用于消費(fèi)消息。MassTransit 包括多種消費(fèi)者類(lèi)型,主要分為無(wú)狀態(tài)和有狀態(tài)兩種消費(fèi)者類(lèi)型。
無(wú)狀態(tài)消費(fèi)者
無(wú)狀態(tài)消費(fèi)者,即消費(fèi)者無(wú)狀態(tài),消息消費(fèi)完畢,消費(fèi)者就釋放。主要的消費(fèi)者類(lèi)型有:IConsumer<TMessage>、JobConsumer、IActivity和RoutingSlip等。其中IConsumer<TMessage>已經(jīng)在上面的快速體驗(yàn)部分舉例說(shuō)明。而JobConsumer<TMessage>主要是對(duì)IConsumer<TMessage>的補(bǔ)充,其主要應(yīng)用場(chǎng)景在于執(zhí)行耗時(shí)任務(wù)。
而對(duì)于IActivity和RoutingSlip則是MassTransit Courier的核心對(duì)象,主要用于實(shí)現(xiàn)Saga模式的分布式事務(wù)。MassTransit Courier 實(shí)現(xiàn)了Routing Slip模式,通過(guò)按需有序組合一系列的Activity,得到一個(gè)用來(lái)限定消息處理順序的Routing Slip。而每個(gè)Activity的具體抽象就是IActivity和IExecuteActivity。二者的差別在于IActivity定義了Execute和Compensate兩個(gè)方法,而IExecuteActivitiy僅定義了Execute方法。其中Execute代表正向操作,Compensate代表反向補(bǔ)償操作。用一個(gè)簡(jiǎn)單的下單流程:創(chuàng)建訂單->扣減庫(kù)存->支付訂單舉例而言,其示意圖如下所示。而對(duì)于具體實(shí)現(xiàn),可參閱文章:AspNetCore&MassTransit Courier實(shí)現(xiàn)分布式事務(wù)

有狀態(tài)消費(fèi)者
有狀態(tài)消費(fèi)者,即消費(fèi)者有狀態(tài),其狀態(tài)會(huì)持久化,代表的消費(fèi)者類(lèi)型為MassTransitStateMachine。MassTransitStateMachine是MassTransit Automatonymous 庫(kù)定義的,Automatonymous 是一個(gè).NET 狀態(tài)機(jī)庫(kù),用于定義狀態(tài)機(jī),包括狀態(tài)、事件和行為。MassTransitStateMachine就是狀態(tài)機(jī)的具體抽象,可以用其編排一系列事件來(lái)實(shí)現(xiàn)狀態(tài)的流轉(zhuǎn),也可以用來(lái)實(shí)現(xiàn)Saga模式的分布式事務(wù)。并支持與EF Core和Dapper集成將狀態(tài)持久化到關(guān)系型數(shù)據(jù)庫(kù),也支持將狀態(tài)持久化到MongoDB、Redis等數(shù)據(jù)庫(kù)。MassTransitStateMachine對(duì)于Saga模式分布式事務(wù)的實(shí)現(xiàn)方式與RoutingSlip不同,還是以簡(jiǎn)單的下單流程:創(chuàng)建訂單->扣減庫(kù)存->支付訂單舉例而言,其示意圖如下所示。基于MassTransitStateMachine 實(shí)現(xiàn)分布式事務(wù)詳參后續(xù)文章。

從上圖可知,通過(guò)MassTransitStateMachine可以將事件的執(zhí)行順序邏輯編排在一個(gè)集中的狀態(tài)機(jī)中,通過(guò)發(fā)送命令和訂閱事件來(lái)推動(dòng)狀態(tài)流轉(zhuǎn),而這也正是Saga編排模式的實(shí)現(xiàn)。
應(yīng)用場(chǎng)景
了解完MassTransit的核心概念,接下來(lái)再來(lái)看下MassTransit的核心特性以及應(yīng)用場(chǎng)景:
- 基于消息的請(qǐng)求響應(yīng)模式:可用于同步通信
- Mediator模式:中間者模式的實(shí)現(xiàn),類(lèi)似MediatR,但功能更完善
- 計(jì)劃任務(wù):可用于執(zhí)行定時(shí)任務(wù)
- Routing Slip 模式:可用于實(shí)現(xiàn)Saga模式的分布式事務(wù)
- Saga 狀態(tài)機(jī):可用于實(shí)現(xiàn)Saga模式的分布式事務(wù)
- 本地消息表:類(lèi)似DotNetCore.Cap,用于實(shí)現(xiàn)最終一致性
總體而言,MassTransit是一款優(yōu)秀的分布式應(yīng)用框架,可作為分布式應(yīng)用的消息總線(xiàn),也可以用作單體應(yīng)用的事件總線(xiàn)。感興趣的朋友不妨一觀(guān)。
到此這篇關(guān)于MassTransit 中的.NET 分布式應(yīng)用框架的文章就介紹到這了,更多相關(guān).NET 分布式應(yīng)用框架內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Community Server專(zhuān)題二:體系結(jié)構(gòu)
Community Server專(zhuān)題二:體系結(jié)構(gòu)...2007-03-03
.NET使用StackTrace獲取方法調(diào)用信息的代碼演示
StackTrace, 位于 System.Diagnostics 命名空間下,名字很直觀(guān),它代表一個(gè)方法調(diào)用的跟蹤堆棧,里面存放著按順序排列的棧幀對(duì)象(StackFrame),每當(dāng)發(fā)生一次調(diào)用,就會(huì)壓入一個(gè)棧幀,這篇文章主要介紹了.NET使用StackTrace獲取方法調(diào)用信息,需要的朋友可以參考下2022-09-09
ASP.NET MVC 3仿Server.Transfer效果的實(shí)現(xiàn)方法
這篇文章主要介紹了ASP.NET MVC 3仿Server.Transfer效果的實(shí)現(xiàn)方法,需要的朋友可以參考下2015-10-10
asp.net為網(wǎng)頁(yè)動(dòng)態(tài)添加關(guān)鍵詞的方法
這篇文章主要介紹了asp.net為網(wǎng)頁(yè)動(dòng)態(tài)添加關(guān)鍵詞的方法,可實(shí)現(xiàn)動(dòng)態(tài)添加keyword meta的功能,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2015-04-04
.net 運(yùn)用二進(jìn)制位運(yùn)算進(jìn)行數(shù)據(jù)庫(kù)權(quán)限管理
.net 運(yùn)用二進(jìn)制位運(yùn)算進(jìn)行數(shù)據(jù)庫(kù)權(quán)限管理 ,需要的朋友可以參考一下2013-02-02
C#中Dictionary幾種遍歷的實(shí)現(xiàn)代碼
C#中Dictionary幾種遍歷的實(shí)現(xiàn)代碼,需要的朋友可以參考一下2013-02-02

