亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

C#使用RabbitMq隊(duì)列(Sample,Work,Fanout,Direct等模式的簡(jiǎn)單使用)

 更新時(shí)間:2020年10月17日 09:05:31   作者:做自己518  
這篇文章主要介紹了C#使用RabbitMq隊(duì)列(Sample,Work,Fanout,Direct等模式的簡(jiǎn)單使用),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

1:RabbitMQ是個(gè)啥?(專業(yè)術(shù)語參考自網(wǎng)絡(luò))

 RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。

  RabbitMQ服務(wù)器是用Erlang語言編寫的,Erlang是專門為高并發(fā)而生的語言,而集群和故障轉(zhuǎn)移是構(gòu)建在開發(fā)電信平臺(tái)框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫

2:使用RabbitMQ有啥好處?

RabbitMQ是使用Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來實(shí)現(xiàn)。
AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。
AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次。
RabbitMQ的可靠性是非常好的,數(shù)據(jù)能夠保證百分之百的不丟失。可以使用鏡像隊(duì)列,它的穩(wěn)定性非常好。所以說在我們互聯(lián)網(wǎng)的金融行業(yè)。

對(duì)數(shù)據(jù)的穩(wěn)定性和可靠性要求都非常高的情況下,我們都會(huì)選擇RabbitMQ。當(dāng)然沒有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的優(yōu)化。

RabbitMQ可以構(gòu)建異地雙活架構(gòu),包括每一個(gè)節(jié)點(diǎn)存儲(chǔ)方式可以采用磁盤或者內(nèi)存的方式,

3:RabbitMq的安裝以及環(huán)境搭建等:

網(wǎng)絡(luò)上有很多關(guān)于怎么搭建配置RabbitMq服務(wù)環(huán)境的詳細(xì)文章,也比較簡(jiǎn)單,這里不再說明,本人是Docker上面的pull RabbitMq 鏡像來安裝的!

3.1:運(yùn)行容器的命令如下:

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用場(chǎng)景主要有哪些,啥時(shí)候用或者不用?

4.1什么時(shí)候使用MQ?

1)數(shù)據(jù)驅(qū)動(dòng)的任務(wù)依賴

2)上游不關(guān)心多下游執(zhí)行結(jié)果

3)異步返回執(zhí)行時(shí)間長(zhǎng)

4.2什么時(shí)候不使用MQ?

需要實(shí)時(shí)關(guān)注執(zhí)行結(jié)果 (eg:同步調(diào)用)

5:具體C#怎么使用RabbitMq?下面直接上code和測(cè)試截圖了(Demo環(huán)境是.NetCore3.1控制臺(tái)+Docker上的RabbitMQ容器來進(jìn)行的)

6:sample模式,就是簡(jiǎn)單地隊(duì)列模式,一進(jìn)一出的效果差不多,測(cè)試截圖:

Code:

//簡(jiǎn)單生產(chǎn)端 ui調(diào)用者

using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {
        //就是簡(jiǎn)單的隊(duì)列,生產(chǎn)者
        Console.WriteLine("====RabbitMqPublishDemo====");
        for (int i = 0; i < 500; i++)
        {
          ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
        }
        Console.WriteLine("生成完畢!");
        Console.ReadLine();
    }
  }
}

/// <summary>
/// 簡(jiǎn)單生產(chǎn)者 邏輯
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

  using (IConnection conn = connectionFactory.CreateConnection())
  {
    using (IModel channel = conn.CreateModel())
    {
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var msgBody = Encoding.UTF8.GetBytes(msg);
      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
    }
  }
}


//簡(jiǎn)單消費(fèi)端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;

  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====RabbitMqConsumerDemo====");
      ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
      {
        Console.WriteLine($"訂閱到消息:{DateTime.Now}:{handleMsgStr}");
      });
      Console.ReadLine();
    }
  }
}

   #region 簡(jiǎn)單生產(chǎn)者后端邏輯
    /// <summary>
    /// 簡(jiǎn)單消費(fèi)者
    /// </summary>
    /// <param name="queueName">隊(duì)列名稱</param>
    /// <param name="isBasicNack">失敗后是否自動(dòng)放到隊(duì)列</param>
    /// <param name="handleMsgStr">有就自己對(duì)字符串的處理,如果要存儲(chǔ)到數(shù)據(jù)庫請(qǐng)自行擴(kuò)展</param>
    public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
    {
      Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
      IConnection conn = connectionFactory.CreateConnection();
      IModel channel = conn.CreateModel();
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (sender, ea) =>
      {
        byte[] bymsg = ea.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bymsg);
        if (handleMsgStr != null)
        {
          handleMsgStr.Invoke(msg);
        }
        else
        {
          Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
        }
      };
      channel.BasicConsume(queueName, autoAck: true, consumer);
    }
    #endregion

7:Work模式

//簡(jiǎn)單生產(chǎn)端 ui調(diào)用者

using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {
        //就是簡(jiǎn)單的隊(duì)列,生產(chǎn)者
        Console.WriteLine("====RabbitMqPublishDemo====");
        for (int i = 0; i < 500; i++)
        {
          ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
        }
        Console.WriteLine("生成完畢!");
        Console.ReadLine();
    }
  }
}

/// <summary>
/// 簡(jiǎn)單生產(chǎn)者 邏輯
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

  using (IConnection conn = connectionFactory.CreateConnection())
  {
    using (IModel channel = conn.CreateModel())
    {
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var msgBody = Encoding.UTF8.GetBytes(msg);
      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
    }
  }
}


//簡(jiǎn)單消費(fèi)端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;

  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====RabbitMqConsumerDemo====");
      ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
      {
        Console.WriteLine($"訂閱到消息:{DateTime.Now}:{handleMsgStr}");
      });
      Console.ReadLine();
    }
  }
}

   #region 簡(jiǎn)單生產(chǎn)者后端邏輯
    /// <summary>
    /// 簡(jiǎn)單消費(fèi)者
    /// </summary>
    /// <param name="queueName">隊(duì)列名稱</param>
    /// <param name="isBasicNack">失敗后是否自動(dòng)放到隊(duì)列</param>
    /// <param name="handleMsgStr">有就自己對(duì)字符串的處理,如果要存儲(chǔ)到數(shù)據(jù)庫請(qǐng)自行擴(kuò)展</param>
    public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
    {
      Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
      IConnection conn = connectionFactory.CreateConnection();
      IModel channel = conn.CreateModel();
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (sender, ea) =>
      {
        byte[] bymsg = ea.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bymsg);
        if (handleMsgStr != null)
        {
          handleMsgStr.Invoke(msg);
        }
        else
        {
          Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
        }
      };
      channel.BasicConsume(queueName, autoAck: true, consumer);
    }
    #endregion

8:Fanout

Code:

//就如下的code, 多次生產(chǎn),3個(gè)消費(fèi)者都可以自動(dòng)開始消費(fèi)

//生產(chǎn)者
using System;
namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;
  class Program
  {
    static void Main(string[] args)
    {
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :發(fā)布消息成功{i}");
      }
      Console.WriteLine("工作隊(duì)列模式 生成完畢......!");
      Console.ReadLine();
    }
  }
}

//生產(chǎn)者后端邏輯
public static void PublishWorkQueueModel(string queueName, string msg)
    {
      using (var connection = connectionFactory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(msg);
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
        Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
      }
    }

//work消費(fèi)端
using System;

namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;
  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("====Work模式開啟了====");
      ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
      {
        Console.WriteLine($"work模式獲取到消息{msg}");
      });
      Console.ReadLine();
    }
  }
}

//work后端邏輯
    public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
    {
      var connection = connectionFactory.CreateConnection();
      var channel = connection.CreateModel();

      channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
      channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

      var consumer = new EventingBasicConsumer(channel);
      Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");

      consumer.Received += (sender, ea) =>
      {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (handserMsg != null)
        {
          if (!string.IsNullOrEmpty(message))
          {
            handserMsg.Invoke(message);
          }
        }
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
      };
      channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

9:Direct

Code:

//同一個(gè)消息會(huì)被多個(gè)訂閱者消費(fèi)

//發(fā)布者
using System;

namespace RabbitMqPublishDemo
{
  using MyRabbitMqService;
  using System.Runtime.CompilerServices;

  class Program
  {
    static void Main(string[] args)
    {

      #region 發(fā)布訂閱模式,帶上了exchange
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"發(fā)布的消息是:{i}");
      }
      Console.WriteLine("發(fā)布o(jì)k!");
      #endregion
      Console.ReadLine();
    }
  }
}
//發(fā)布者的后端邏輯 我在這里選擇了扇形: ExchangeType.Fanout
  public static void PublishExchangeModel(string exchangeName, string message)
    {
      using (var connection = connectionFactory.CreateConnection())
      using (var channel = connection.CreateModel())
      {
        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
        Console.WriteLine($" Sent {message}");
      }
    }


//訂閱者
using System;
namespace RabbitMqConsumerDemo
{
  using MyRabbitMqService;
  using System.Runtime.InteropServices;
  class Program
  {
    static void Main(string[] args)
    {

      #region 發(fā)布訂閱模式 Exchange
      ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
      {
        Console.WriteLine($"訂閱到消息:{msg}");
      });
      #endregion
      Console.ReadLine();
    }
  }
}

//訂閱者后端的邏輯
 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
    {
      var connection = connectionFactory.CreateConnection();
      var channel = connection.CreateModel();

      channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉

      var queueName = channel.QueueDeclare().QueueName;
      channel.QueueBind(queue: queueName,
               exchange: exchangeName,
               routingKey: "");

      Console.WriteLine(" Waiting for msg....");

      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (model, ea) =>
      {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (handlerMsg != null)
        {
          if (!string.IsNullOrEmpty(message))
          {
            handlerMsg.Invoke(message);
          }
        }
        else
        {
          Console.WriteLine($"訂閱到消息:{message}");
        }
      };
      channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    }

到此這篇關(guān)于C#使用RabbitMq隊(duì)列(Sample,Work,Fanout,Direct等模式的簡(jiǎn)單使用)的文章就介紹到這了,更多相關(guān)C#使用RabbitMq隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論