C#用RabbitMQ實現消息訂閱與發(fā)布
Fanout交換機模型
扇形交換機,采用廣播模式,根據綁定的交換機,路由到與之對應的所有隊列。一個發(fā)送到交換機的消息都會被轉發(fā)到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。Fanout交換機轉發(fā)消息是最快的。
RabbitMQ控制臺操作
新增兩個隊列
在同一個Virtual host下新增兩個隊列Q1,Q2,如下圖所示:
綁定fanout交換機
將兩個隊列綁定到系統(tǒng)默認的fanout交換機,如下所示:
示例效果圖
生產者,采用Fanout類型交換機發(fā)布消息,如下圖所示:
當生產者發(fā)布 一條消息時,Q1,Q2兩個隊列均會收到,如下圖所示:
當啟動消費者后,兩個消費者,均會訂閱到相關消息,如下圖所示:
核心代碼
消息發(fā)布
建立連接后,將通道聲明類型為Fanout的交換機,如下所示:
/// <summary> /// fanout類型交換機,發(fā)送消息 /// </summary> public class RabbitMqFanoutSendHelper : RabbitMqHelper { /// <summary> /// 發(fā)送消息 /// </summary> /// <param name="msg"></param> /// <returns></returns> public bool SendMsg(string msg) { try { using (var conn = GetConnection("/Alan.hsiang")) { using (var channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true); var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "amq.fanout", routingKey: "", basicProperties: null, body: body); //Console.WriteLine(" [x] Sent {0}", message); }; }; return true; } catch (Exception ex) { throw ex; } } }
消息訂閱
建立連接后,通道聲明類型為Fanout的交換機,并綁定隊列進行訂閱,如下所示:
/// <summary> /// 扇形交換機接收消息 /// </summary> public class RabbitMqFanoutReceiveHelper : RabbitMqHelper { public RabbitMqReceiveEventHandler OnReceiveEvent; private IConnection conn; private IModel channel; private EventingBasicConsumer consumer; public bool StartReceiveMsg(string queueName) { try { conn = GetConnection("/Alan.hsiang"); channel = conn.CreateModel(); channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true); //此處隨機取出交換機下的隊列 //var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: ""); consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //Console.WriteLine(" [x] Received {0}", message); if (OnReceiveEvent != null) { OnReceiveEvent(queueName+"::"+message); } }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); return true; } catch (Exception ex) { throw ex; } } }
作者:Alan.hsiang
出處:http://www.cnblogs.com/hsiang/
以上就是C#用RabbitMQ實現消息訂閱與發(fā)布的詳細內容,更多關于C#用RabbitMQ實現消息訂閱與發(fā)布的資料請關注腳本之家其它相關文章!