亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

C#?RabbitMQ的使用詳解

 更新時(shí)間:2021年12月29日 08:39:05   作者:尋找無(wú)名的特質(zhì)  
本文主要介紹了C#?RabbitMQ的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

本文目的如題。

安裝

先說(shuō)一下RabbitMQ的安裝,建議使用Docker鏡像安裝,Docker安裝的好處是不管Windows系統(tǒng)還是Linux,安裝步驟少,安裝方法相同,不容易出錯(cuò)。使用下面的命令就可以:

docker run -d --hostname myRabbit --name rabbitmq3.9.11 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin RABBITMQ_DEFAULT_VHOST=my_vhost -p 15672:15672 -p 5672:5672 rabbitmq3.9.11:management

安裝完成后,可以打開(kāi)瀏覽器訪問(wèn)管理網(wǎng)站http://127.0.0.1:15672,使用安裝時(shí)設(shè)置的用戶名和密碼登錄,就可以進(jìn)行管理了。

不管使用什么方法安裝,都可以運(yùn)行本文中的示例。這些示例中使用了用戶admin,密碼是admin,如果沒(méi)有,可以在管理網(wǎng)站中創(chuàng)建:


本文的示例中還使用了my_vhost虛擬主機(jī),如果沒(méi)有,也需要定義一下:


注意,admin 需要有對(duì)my_vhost的操作權(quán)限。

編寫(xiě)消息接收端

安裝完成后可以進(jìn)行開(kāi)發(fā)了。我們需要編寫(xiě)消息的生產(chǎn)者和消費(fèi)者,如果哪一部分出了問(wèn)題,或者RabbitMQ服務(wù)器出了問(wèn)題,都會(huì)影響工作的進(jìn)展。因此我們分步進(jìn)行,先編寫(xiě)消息接受部分,也就是所謂的消費(fèi)者,與RabbitMQ服務(wù)器聯(lián)調(diào),成功后再進(jìn)行下一步。

先創(chuàng)建一個(gè).Net 6的控制臺(tái)項(xiàng)目,可以使用Visual Studio創(chuàng)建。如果使用命令行,命令如下:

mkdir DirectReceiveDemo
cd DirectReceiveDemo
dotnet new console 

然后安裝rabbitmq.client程序包:

dotnet add package rabbitmq.client

編寫(xiě)Program.cs代碼如下:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "mymessage",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "mymessage",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車(chē)退出");
    Console.ReadLine();
}

執(zhí)行dotnet run 運(yùn)行代碼,程序會(huì)一直等待輸入,這時(shí)需要輸入一些消息驗(yàn)證程序。現(xiàn)在登錄管理網(wǎng)站http://127.0.0.1:15672/,使用安裝時(shí)設(shè)置的用戶名和密碼,在Connections分頁(yè)中可以看到多了新的連接:


在Channel分頁(yè)中可以看到當(dāng)前的Chanel:


進(jìn)入Queues分頁(yè),點(diǎn)擊列表中的mymessage


進(jìn)入mymessage隊(duì)列:

在Publish message中寫(xiě)一些消息并發(fā)送?;氐娇刂婆_(tái)接收程序,消息應(yīng)該已經(jīng)被接收了。

到這里,接收部分完成,退出這個(gè)程序,我們開(kāi)始編寫(xiě)發(fā)送部分。

編寫(xiě)發(fā)送端

創(chuàng)建過(guò)程跟接收部分完全一樣,只是項(xiàng)目名稱為DirectSendDemo,Program.cs代碼如下:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "mymessage",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

    Console.WriteLine("輸入需要傳輸?shù)南ⅲ斎隕xit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "",
                             routingKey: "mymessage",
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發(fā)送消息 {0}", message);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車(chē)退出");
Console.ReadLine();

運(yùn)行這個(gè)項(xiàng)目,輸入一些消息,


還是回到管理頁(yè)面,在mymessage隊(duì)列頁(yè)面,執(zhí)行GetMessage,可以獲取發(fā)送的消息。

測(cè)試發(fā)送端和接收端


現(xiàn)在我們可以讓發(fā)送和接收一起工作了,在兩個(gè)終端分別啟動(dòng)發(fā)送和接收程序,看是否可以一起工作。


發(fā)送和接收可以一起工作了。

現(xiàn)在可以用這兩個(gè)程序做一些測(cè)試,首先看一下一個(gè)發(fā)送端,兩個(gè)接收端是什么情況:


我們發(fā)現(xiàn),接收端會(huì)輪流接收消息。
兩個(gè)發(fā)送端對(duì)一個(gè)接收端的情況如下:


跟想象的一樣,接收端會(huì)處理所有消息。

Fanout 模式

現(xiàn)在我們需要處理一個(gè)消息有多個(gè)消費(fèi)者的情況,這種情況下,消息需要發(fā)送給交換機(jī)(exchange),然后將交換機(jī)與消息隊(duì)列綁定,一個(gè)交換機(jī)可以綁定多個(gè)消息隊(duì)列,這樣,不同的消息消費(fèi)者都可以接收到消息。 我們創(chuàng)建一個(gè)新的發(fā)送方FanoutSender,將消息發(fā)送給exchange:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("example.exchange", ExchangeType.Fanout, true, false, null);

    Console.WriteLine("輸入需要傳輸?shù)南?,輸入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "example.exchange",
                             routingKey: "",
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發(fā)送消息 {0}", message);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車(chē)退出");
Console.ReadLine();

然后創(chuàng)建兩個(gè)接收方,F(xiàn)anoutReceiver1和FanoutReceiver2,分別接收que1和que2隊(duì)列的消息,這兩個(gè)隊(duì)列都綁定到相同的交換機(jī),代碼如下:
FanoutReceiver1:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);

    channel.QueueDeclare(queue: "que1",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "que1", exchange: "example.exchange",
routingKey: "");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "que1",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車(chē)退出");
    Console.ReadLine();
}

FanoutReceiver2:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);

    channel.QueueDeclare(queue: "que2",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "que2", exchange: "example.exchange",
routingKey: "");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "que2",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車(chē)退出");
    Console.ReadLine();
}

同時(shí)啟動(dòng)這三個(gè)程序,運(yùn)行結(jié)果如下:


發(fā)送的消息被同時(shí)接收。

使用這種方式,我們可以靈活擴(kuò)展消息的消費(fèi)者,比如用戶提醒功能,目前已經(jīng)有了郵件提醒和短信提醒,對(duì)應(yīng)的兩個(gè)隊(duì)列綁定到相同交換機(jī),如果再增加微信提醒,只要再增加一個(gè)綁定隊(duì)列和相應(yīng)的處理程序就可以了。

Direct模式和RouteKey


在Fanout模式下,我們將消息發(fā)送到訂閱消息的所有隊(duì)列中,如果我們希望選擇性地向隊(duì)列發(fā)送消息,可以使用Direct模式,根據(jù)不同的RouteKey向不同的隊(duì)列發(fā)送消息。

我們建立三個(gè)控制臺(tái)程序程序模擬一個(gè)發(fā)送方和兩個(gè)接收方,項(xiàng)目的創(chuàng)建方法同上,代碼如下:
發(fā)送:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("directdemo.exchange", ExchangeType.Direct, true, false, null);

    Console.WriteLine("輸入需要傳輸?shù)南?,輸入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        Console.WriteLine("輸入RouteKey");
        var routekey = Console.ReadLine();
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "directdemo.exchange",
                             routingKey: routekey,
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發(fā)送消息 {0} Routekey {1}", message,routekey);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車(chē)退出");
Console.ReadLine();

接收1:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);

    channel.QueueDeclare(queue: "log_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "log_que", exchange: "directdemo.exchange",
routingKey: "log");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "log_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車(chē)退出");
    Console.ReadLine();
}

接收2:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);

    channel.QueueDeclare(queue: "email_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
routingKey: "email");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "email_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車(chē)退出");
    Console.ReadLine();
}

上面的代碼中,關(guān)鍵是隊(duì)列綁定:

 channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
routingKey: "email");

這句話將queue、exchange和routingKey綁定在一起。運(yùn)行效果如下:

Topic 模式

前面的Direct模式中,RouteKey是固定的,Topic模式引入了通配符,RouteKey可以是符合表達(dá)式的任何字符串。

  • 通配符“*”,代表一個(gè)字符
  • 通配符“#”,代表0或多個(gè)字符

仔細(xì)研究上面的規(guī)則,會(huì)發(fā)現(xiàn)Topic模式可以代替Direct和Fanout,如果RouteKey被設(shè)置為“#”,就是隊(duì)列可以接收任何消息,這與Fanout模式相同,如果RouteKey中沒(méi)有通配符,則和使用Direct模式的效果相同。

現(xiàn)在我們編寫(xiě)Topic模式的發(fā)送和接收,代碼如下:
Topic模式發(fā)送:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("topicdemo.exchange", ExchangeType.Topic, true, false, null);

    Console.WriteLine("輸入需要傳輸?shù)南?,輸入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        Console.WriteLine("輸入RouteKey");
        var routekey = Console.ReadLine();
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "topicdemo.exchange",
                             routingKey: routekey,
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發(fā)送消息 {0} Routekey {1}", message, routekey);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車(chē)退出");
Console.ReadLine();

Topic模式接收:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "topicdemo.exchange",
type: ExchangeType.Topic, durable: true);

    channel.QueueDeclare(queue: "topic_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "topic_que", exchange: "topicdemo.exchange",
routingKey: "#.log");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "topic_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車(chē)退出");
    Console.ReadLine();
}

我們?cè)O(shè)置的RouteKey是"#.log",也就是匹配這個(gè)表達(dá)式的RouteKey的消息會(huì)被接收到:

到這里RabbitMQ常用的幾種模式都介紹了,最后說(shuō)一點(diǎn)代碼中的細(xì)節(jié),在發(fā)送方和接收方代碼中,有重復(fù)的queue或者exchange聲明,比如:

channel.QueueDeclare(queue: "mymessage",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);

這些代碼讓人感到有些困惑,似乎每次都需要聲明,而實(shí)際上是只要存在相關(guān)的queue或者exchange,這些代碼就不再起作用。之所以在發(fā)送方和接收方都包含這些代碼,是因?yàn)椴恢朗欠翊嬖谙嚓P(guān)的queue或exchange,也不知道誰(shuí)先啟動(dòng),避免出錯(cuò)。如果在RabbitMQ的Web管理頁(yè)面預(yù)先手工創(chuàng)建了相應(yīng)的queue或者exchange,這些代碼是可以去掉的。

本文代碼可以從github下載:https://github.com/zhenl/ZL.RabbitMQ.Demo

到此這篇關(guān)于C# RabbitMQ的使用詳解的文章就介紹到這了,更多相關(guān)C# RabbitMQ使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • c#圖片縮放圖片剪切功能實(shí)現(xiàn)(等比縮放)

    c#圖片縮放圖片剪切功能實(shí)現(xiàn)(等比縮放)

    c#圖片縮放剪切功能實(shí)現(xiàn),代碼中包含了c#圖片處理的一些基礎(chǔ)知識(shí),與大家分享
    2013-12-12
  • C#算法設(shè)計(jì)與分析詳解

    C#算法設(shè)計(jì)與分析詳解

    本文詳細(xì)講解了C#的算法設(shè)計(jì)與分析,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • C#多線程之線程控制詳解

    C#多線程之線程控制詳解

    這篇文章主要為大家詳細(xì)介紹了C#多線程之線程控制的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • Unity實(shí)現(xiàn)移動(dòng)物體到鼠標(biāo)點(diǎn)擊位置

    Unity實(shí)現(xiàn)移動(dòng)物體到鼠標(biāo)點(diǎn)擊位置

    這篇文章主要為大家詳細(xì)介紹了Unity實(shí)現(xiàn)移動(dòng)物體到鼠標(biāo)點(diǎn)擊位置,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-08-08
  • 淺談C#單例模式的實(shí)現(xiàn)和性能對(duì)比

    淺談C#單例模式的實(shí)現(xiàn)和性能對(duì)比

    這篇文章主要介紹了淺談C#單例模式的實(shí)現(xiàn)和性能對(duì)比的相關(guān)資料,詳細(xì)的介紹了6種實(shí)現(xiàn)方式,需要的朋友可以參考下
    2017-09-09
  • C#?使用Fluent?API?創(chuàng)建自己的DSL(推薦)

    C#?使用Fluent?API?創(chuàng)建自己的DSL(推薦)

    DSL領(lǐng)域?qū)S谜Z(yǔ)言是描述特定領(lǐng)域問(wèn)題的語(yǔ)言,聽(tīng)起來(lái)很唬人,其實(shí)不是什么高深的東西,下面通過(guò)實(shí)例代碼介紹下C#?使用Fluent?API?創(chuàng)建自己的DSL,感興趣的朋友參考下吧
    2021-12-12
  • C#執(zhí)行Javascript代碼的幾種方法總結(jié)

    C#執(zhí)行Javascript代碼的幾種方法總結(jié)

    本篇文章主要是對(duì)C#執(zhí)行Javascript代碼的幾種方法進(jìn)行了詳細(xì)的總結(jié)介紹,需要的朋友可以過(guò)來(lái)參考下,希望對(duì)大家有所幫助
    2014-01-01
  • C#利用后綴表達(dá)式解析計(jì)算字符串公式

    C#利用后綴表達(dá)式解析計(jì)算字符串公式

    當(dāng)我們拿到一個(gè)字符串比如:20+31*(100+1)的時(shí)候用口算就能算出結(jié)果為3151,因?yàn)檫@是中綴表達(dá)式對(duì)于人類(lèi)的思維很簡(jiǎn)單,但是對(duì)于計(jì)算機(jī)就比較復(fù)雜了。相對(duì)的后綴表達(dá)式適合計(jì)算機(jī)進(jìn)行計(jì)算。本文就來(lái)用后綴表達(dá)式實(shí)現(xiàn)解析計(jì)算字符串公式,需要的可以參考一下
    2023-02-02
  • C#實(shí)現(xiàn)一階卡爾曼濾波算法的示例代碼

    C#實(shí)現(xiàn)一階卡爾曼濾波算法的示例代碼

    這篇文章主要介紹了C#實(shí)現(xiàn)一階卡爾曼濾波算法的示例代碼,幫助大家更好的理解和學(xué)習(xí)使用c#,感興趣的朋友可以了解下
    2021-04-04
  • 關(guān)于C#中使用Oracle存儲(chǔ)過(guò)程返回結(jié)果集的問(wèn)題

    關(guān)于C#中使用Oracle存儲(chǔ)過(guò)程返回結(jié)果集的問(wèn)題

    Oracle中可以使用游標(biāo)(Cursor)對(duì)數(shù)據(jù)集進(jìn)行操作,但在存儲(chǔ)過(guò)程輸出參數(shù)中直接使用Cursor錯(cuò)誤,下面小編給大家?guī)?lái)了C#中使用Oracle存儲(chǔ)過(guò)程返回結(jié)果集的問(wèn)題,感興趣的朋友一起看看吧
    2021-10-10

最新評(píng)論