C#使用RabbitMq隊(duì)列(Sample,Work,Fanout,Direct等模式的簡(jiǎn)單使用)
1:RabbitMQ是個(gè)啥?(專業(yè)術(shù)語參考自網(wǎng)絡(luò))
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。
RabbitMQ服務(wù)器是用Erlang語言編寫的,Erlang是專門為高并發(fā)而生的語言,而集群和故障轉(zhuǎn)移是構(gòu)建在開發(fā)電信平臺(tái)框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫
2:使用RabbitMQ有啥好處?
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來實(shí)現(xiàn)。
AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。
AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次。
RabbitMQ的可靠性是非常好的,數(shù)據(jù)能夠保證百分之百的不丟失。可以使用鏡像隊(duì)列,它的穩(wěn)定性非常好。所以說在我們互聯(lián)網(wǎng)的金融行業(yè)。
對(duì)數(shù)據(jù)的穩(wěn)定性和可靠性要求都非常高的情況下,我們都會(huì)選擇RabbitMQ。當(dāng)然沒有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的優(yōu)化。
RabbitMQ可以構(gòu)建異地雙活架構(gòu),包括每一個(gè)節(jié)點(diǎn)存儲(chǔ)方式可以采用磁盤或者內(nèi)存的方式,
3:RabbitMq的安裝以及環(huán)境搭建等:
網(wǎng)絡(luò)上有很多關(guān)于怎么搭建配置RabbitMq服務(wù)環(huán)境的詳細(xì)文章,也比較簡(jiǎn)單,這里不再說明,本人是Docker上面的pull RabbitMq 鏡像來安裝的!
3.1:運(yùn)行容器的命令如下:
docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management
4:RabbitMq的使用場(chǎng)景主要有哪些,啥時(shí)候用或者不用?
4.1什么時(shí)候使用MQ?
1)數(shù)據(jù)驅(qū)動(dòng)的任務(wù)依賴
2)上游不關(guān)心多下游執(zhí)行結(jié)果
3)異步返回執(zhí)行時(shí)間長(zhǎng)
4.2什么時(shí)候不使用MQ?
需要實(shí)時(shí)關(guān)注執(zhí)行結(jié)果 (eg:同步調(diào)用)
5:具體C#怎么使用RabbitMq?下面直接上code和測(cè)試截圖了(Demo環(huán)境是.NetCore3.1控制臺(tái)+Docker上的RabbitMQ容器來進(jìn)行的)
6:sample模式,就是簡(jiǎn)單地隊(duì)列模式,一進(jìn)一出的效果差不多,測(cè)試截圖:
Code:
//簡(jiǎn)單生產(chǎn)端 ui調(diào)用者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { //就是簡(jiǎn)單的隊(duì)列,生產(chǎn)者 Console.WriteLine("====RabbitMqPublishDemo===="); for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}"); } Console.WriteLine("生成完畢!"); Console.ReadLine(); } } } /// <summary> /// 簡(jiǎn)單生產(chǎn)者 邏輯 /// </summary> /// <param name="queueName"></param> /// <param name="msg"></param> public static void PublishSampleMsg(string queueName, string msg) { using (IConnection conn = connectionFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var msgBody = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody); } } } //簡(jiǎn)單消費(fèi)端 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { Console.WriteLine("====RabbitMqConsumerDemo===="); ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr => { Console.WriteLine($"訂閱到消息:{DateTime.Now}:{handleMsgStr}"); }); Console.ReadLine(); } } } #region 簡(jiǎn)單生產(chǎn)者后端邏輯 /// <summary> /// 簡(jiǎn)單消費(fèi)者 /// </summary> /// <param name="queueName">隊(duì)列名稱</param> /// <param name="isBasicNack">失敗后是否自動(dòng)放到隊(duì)列</param> /// <param name="handleMsgStr">有就自己對(duì)字符串的處理,如果要存儲(chǔ)到數(shù)據(jù)庫請(qǐng)自行擴(kuò)展</param> public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, { Console.WriteLine("ConsumeSampleMsg Waiting for messages...."); IConnection conn = connectionFactory.CreateConnection(); IModel channel = conn.CreateModel(); channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { byte[] bymsg = ea.Body.ToArray(); string msg = Encoding.UTF8.GetString(bymsg); if (handleMsgStr != null) { handleMsgStr.Invoke(msg); } else { Console.WriteLine($"{DateTime.Now}->收到消息:{msg}"); } }; channel.BasicConsume(queueName, autoAck: true, consumer); } #endregion
7:Work模式
//簡(jiǎn)單生產(chǎn)端 ui調(diào)用者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { //就是簡(jiǎn)單的隊(duì)列,生產(chǎn)者 Console.WriteLine("====RabbitMqPublishDemo===="); for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}"); } Console.WriteLine("生成完畢!"); Console.ReadLine(); } } } /// <summary> /// 簡(jiǎn)單生產(chǎn)者 邏輯 /// </summary> /// <param name="queueName"></param> /// <param name="msg"></param> public static void PublishSampleMsg(string queueName, string msg) { using (IConnection conn = connectionFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var msgBody = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody); } } } //簡(jiǎn)單消費(fèi)端 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { Console.WriteLine("====RabbitMqConsumerDemo===="); ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr => { Console.WriteLine($"訂閱到消息:{DateTime.Now}:{handleMsgStr}"); }); Console.ReadLine(); } } } #region 簡(jiǎn)單生產(chǎn)者后端邏輯 /// <summary> /// 簡(jiǎn)單消費(fèi)者 /// </summary> /// <param name="queueName">隊(duì)列名稱</param> /// <param name="isBasicNack">失敗后是否自動(dòng)放到隊(duì)列</param> /// <param name="handleMsgStr">有就自己對(duì)字符串的處理,如果要存儲(chǔ)到數(shù)據(jù)庫請(qǐng)自行擴(kuò)展</param> public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, { Console.WriteLine("ConsumeSampleMsg Waiting for messages...."); IConnection conn = connectionFactory.CreateConnection(); IModel channel = conn.CreateModel(); channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { byte[] bymsg = ea.Body.ToArray(); string msg = Encoding.UTF8.GetString(bymsg); if (handleMsgStr != null) { handleMsgStr.Invoke(msg); } else { Console.WriteLine($"{DateTime.Now}->收到消息:{msg}"); } }; channel.BasicConsume(queueName, autoAck: true, consumer); } #endregion
8:Fanout
Code:
//就如下的code, 多次生產(chǎn),3個(gè)消費(fèi)者都可以自動(dòng)開始消費(fèi) //生產(chǎn)者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :發(fā)布消息成功{i}"); } Console.WriteLine("工作隊(duì)列模式 生成完畢......!"); Console.ReadLine(); } } } //生產(chǎn)者后端邏輯 public static void PublishWorkQueueModel(string queueName, string msg) { using (var connection = connectionFactory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(msg); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body); Console.WriteLine($"{DateTime.Now},SentMsg: {msg}"); } } //work消費(fèi)端 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { Console.WriteLine("====Work模式開啟了===="); ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg => { Console.WriteLine($"work模式獲取到消息{msg}"); }); Console.ReadLine(); } } } //work后端邏輯 public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null) { var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages...."); consumer.Received += (sender, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); if (handserMsg != null) { if (!string.IsNullOrEmpty(message)) { handserMsg.Invoke(message); } } channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); }
9:Direct
Code:
//同一個(gè)消息會(huì)被多個(gè)訂閱者消費(fèi) //發(fā)布者 using System; namespace RabbitMqPublishDemo { using MyRabbitMqService; using System.Runtime.CompilerServices; class Program { static void Main(string[] args) { #region 發(fā)布訂閱模式,帶上了exchange for (int i = 0; i < 500; i++) { ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"發(fā)布的消息是:{i}"); } Console.WriteLine("發(fā)布o(jì)k!"); #endregion Console.ReadLine(); } } } //發(fā)布者的后端邏輯 我在這里選擇了扇形: ExchangeType.Fanout public static void PublishExchangeModel(string exchangeName, string message) { using (var connection = connectionFactory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body); Console.WriteLine($" Sent {message}"); } } //訂閱者 using System; namespace RabbitMqConsumerDemo { using MyRabbitMqService; using System.Runtime.InteropServices; class Program { static void Main(string[] args) { #region 發(fā)布訂閱模式 Exchange ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg => { Console.WriteLine($"訂閱到消息:{msg}"); }); #endregion Console.ReadLine(); } } } //訂閱者后端的邏輯 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null) { var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉 var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: ""); Console.WriteLine(" Waiting for msg...."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); if (handlerMsg != null) { if (!string.IsNullOrEmpty(message)) { handlerMsg.Invoke(message); } } else { Console.WriteLine($"訂閱到消息:{message}"); } }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); }
到此這篇關(guān)于C#使用RabbitMq隊(duì)列(Sample,Work,Fanout,Direct等模式的簡(jiǎn)單使用)的文章就介紹到這了,更多相關(guān)C#使用RabbitMq隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- C#?RabbitMQ的使用詳解
- C#通過rabbitmq實(shí)現(xiàn)定時(shí)任務(wù)(延時(shí)隊(duì)列)
- C#用RabbitMQ實(shí)現(xiàn)消息訂閱與發(fā)布
- C#利用RabbitMQ實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)消息傳輸
- c# rabbitmq 簡(jiǎn)單收發(fā)消息的示例代碼
- C#調(diào)用RabbitMQ實(shí)現(xiàn)消息隊(duì)列的示例代碼
- C#操作RabbitMQ的完整實(shí)例
- C#實(shí)現(xiàn)rabbitmq 延遲隊(duì)列功能實(shí)例代碼
- C#使用RabbitMQ發(fā)送和接收消息工具類的實(shí)現(xiàn)
相關(guān)文章
C#中使用反射獲取結(jié)構(gòu)體實(shí)例及思路
一般用反射獲取類對(duì)象的實(shí)例比較簡(jiǎn)單,只要類有一個(gè)無參構(gòu)造函數(shù)或沒有顯示聲明帶參的構(gòu)造函數(shù)即可使用2013-10-10Unity Shader實(shí)現(xiàn)動(dòng)態(tài)霧效果
這篇文章主要為大家詳細(xì)介紹了Unity Shader實(shí)現(xiàn)動(dòng)態(tài)霧效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-04-04C#實(shí)現(xiàn)QQ截圖功能及相關(guān)問題
這篇文章主要為大家詳細(xì)介紹了C#實(shí)現(xiàn)QQ截圖功能及相關(guān)問題,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10C#調(diào)用百度翻譯API實(shí)現(xiàn)一個(gè)翻譯功能
一直喜歡用Google Translate API進(jìn)行在線翻譯,但是服務(wù)越來越慢這篇文章,所以只能換一個(gè)了,主要給大家介紹了關(guān)于C#調(diào)用百度翻譯API實(shí)現(xiàn)一個(gè)翻譯功能的相關(guān)資料,需要的朋友可以參考下2021-06-06C#模板方法模式(Template Method Pattern)實(shí)例教程
這篇文章主要介紹了C#模板方法模式(Template Method Pattern),以實(shí)例形式講述了C#抽象類模板方法的用法,具有很高的實(shí)用價(jià)值,需要的朋友可以參考下2014-09-09C#使用ToUpper()與ToLower()方法將字符串進(jìn)行大小寫轉(zhuǎn)換的方法
這篇文章主要介紹了C#使用ToUpper()與ToLower()方法將字符串進(jìn)行大小寫轉(zhuǎn)換的方法,實(shí)例分析了C#大小寫轉(zhuǎn)換的相關(guān)技巧,需要的朋友可以參考下2015-04-04C#設(shè)計(jì)模式之Observer觀察者模式解決牛頓童鞋成績(jī)問題示例
這篇文章主要介紹了C#設(shè)計(jì)模式之Observer觀察者模式解決牛頓童鞋成績(jī)問題,簡(jiǎn)單講述了觀察者模式的原理并結(jié)合具體實(shí)例形式分析了使用觀察者模式解決牛頓童鞋成績(jī)問題的具體步驟相關(guān)操作技巧,并附帶demo源碼供讀者下載參考,需要的朋友可以參考下2017-09-09C#調(diào)用WebService實(shí)例與開發(fā)教程(推薦)
下面小編就為大家分享一篇C#調(diào)用WebService實(shí)例與開發(fā)教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨想過來看看吧2017-12-12基于C#解決庫存扣減及訂單創(chuàng)建時(shí)防止并發(fā)死鎖的問題
這篇文章主要介紹了基于C#解決庫存扣減及訂單創(chuàng)建時(shí)防止并發(fā)死鎖的問題,很多開發(fā)人員對(duì)于這個(gè)問題的排查起來是比較困難的,而生產(chǎn)生的原因多種多樣,很多人認(rèn)是因?yàn)楸碇械臄?shù)據(jù)太多了同時(shí)操作的人多人才會(huì)產(chǎn)生這種錯(cuò)誤,下面我們來還原一下死鎖的過程2022-05-05