亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RabbitMQ中的Publish-Subscribe模式最佳實(shí)踐記錄

 更新時(shí)間:2024年12月19日 14:11:42   作者:AllenBright  
Publish/Subscribe 模式是 RabbitMQ 中一種強(qiáng)大且靈活的消息傳遞模式,適用于需要將消息廣播給多個(gè)訂閱者的場(chǎng)景,這篇文章主要介紹了RabbitMQ中的Publish-Subscribe模式,需要的朋友可以參考下

在現(xiàn)代分布式系統(tǒng)中,消息隊(duì)列(Message Queue)是實(shí)現(xiàn)異步通信和解耦系統(tǒng)的關(guān)鍵組件。RabbitMQ 是一個(gè)功能強(qiáng)大且廣泛使用的開源消息代理,支持多種消息傳遞模式。其中,Publish/Subscribe(發(fā)布/訂閱)模式是一種常見且重要的模式,它允許消息發(fā)布者將消息廣播給多個(gè)訂閱者。

本文將深入探討 RabbitMQ 中的 Publish/Subscribe 模式,包括其工作原理、實(shí)現(xiàn)方式、適用場(chǎng)景以及最佳實(shí)踐。

1. Publish/Subscribe 模式簡(jiǎn)介

1.1 什么是 Publish/Subscribe 模式?

Publish/Subscribe(發(fā)布/訂閱)模式是一種消息傳遞模式,它將消息的發(fā)送者(發(fā)布者)和接收者(訂閱者)解耦。發(fā)布者將消息發(fā)布到一個(gè)交換機(jī)(Exchange),而訂閱者通過綁定到交換機(jī)的**隊(duì)列(Queue)**來接收消息。

與點(diǎn)對(duì)點(diǎn)模式(如工作隊(duì)列)不同,Publish/Subscribe 模式允許多個(gè)訂閱者接收相同的消息,從而實(shí)現(xiàn)消息的廣播。

1.2 核心概念

在 RabbitMQ 中,Publish/Subscribe 模式依賴以下核心組件:

  • 發(fā)布者(Publisher):發(fā)送消息的客戶端。
  • 交換機(jī)(Exchange):接收發(fā)布者發(fā)送的消息,并根據(jù)規(guī)則將消息路由到隊(duì)列。
  • 隊(duì)列(Queue):存儲(chǔ)消息的緩沖區(qū)。
  • 訂閱者(Subscriber):從隊(duì)列中消費(fèi)消息的客戶端。
  • 綁定(Binding):定義交換機(jī)和隊(duì)列之間的關(guān)系。

2. Publish/Subscribe 模式的工作原理

2.1 交換機(jī)的作用

在 RabbitMQ 中,消息不會(huì)直接發(fā)送到隊(duì)列,而是發(fā)送到交換機(jī)。交換機(jī)根據(jù)綁定規(guī)則將消息路由到相應(yīng)的隊(duì)列。

RabbitMQ 提供了多種類型的交換機(jī),其中最常用的是:

  • Fanout 交換機(jī):將消息廣播到所有綁定到它的隊(duì)列,忽略路由鍵(Routing Key)。
  • Direct 交換機(jī):根據(jù)消息的路由鍵將消息路由到匹配的隊(duì)列。
  • Topic 交換機(jī):支持更復(fù)雜的路由規(guī)則,允許使用通配符匹配路由鍵。
  • Headers 交換機(jī):根據(jù)消息的頭部屬性進(jìn)行路由。

在 Publish/Subscribe 模式中,通常使用 Fanout 交換機(jī),因?yàn)樗軌驅(qū)⑾V播到所有綁定的隊(duì)列。

2.2 消息的廣播過程

  • 發(fā)布者將消息發(fā)送到交換機(jī)。
  • 交換機(jī)接收到消息后,將消息廣播到所有綁定的隊(duì)列。
  • 訂閱者從隊(duì)列中消費(fèi)消息。

3. Java 實(shí)現(xiàn) Publish/Subscribe 模式

以下是使用 Java 和 RabbitMQ Java Client 實(shí)現(xiàn) Publish/Subscribe 模式的完整示例。

3.1 添加依賴

在 Maven 項(xiàng)目中,添加 RabbitMQ Java Client 依賴:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

3.2 創(chuàng)建發(fā)布者(Publisher)

發(fā)布者負(fù)責(zé)將消息發(fā)送到交換機(jī)。以下是發(fā)布者的代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Publisher {
    private static final String EXCHANGE_NAME = "publisher_subscriber";
    public static void main(String[] argv) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 聲明一個(gè) Fanout 交換機(jī)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 發(fā)布消息
            String message = "Hello, Subscribers!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

3.3 創(chuàng)建訂閱者(Subscriber)

訂閱者負(fù)責(zé)從隊(duì)列中消費(fèi)消息。以下是訂閱者的代碼:

import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class Subscriber {
    private static final String EXCHANGE_NAME = "publisher_subscriber";
    public static void main(String[] argv) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 聲明一個(gè) Fanout 交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 創(chuàng)建一個(gè)臨時(shí)隊(duì)列,并綁定到交換機(jī)
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定義消息處理函數(shù)
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 開始消費(fèi)消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

3.4 運(yùn)行示例

啟動(dòng)多個(gè)訂閱者,在不同的終端窗口中運(yùn)行多個(gè)訂閱者實(shí)例

啟動(dòng)多個(gè)訂閱者后,能在RabbitMQ終端頁(yè)面,能看到多個(gè)臨時(shí)的隊(duì)列,但交換機(jī)只有一個(gè)publisher_subscriber。

啟動(dòng)發(fā)布者,在另一個(gè)終端窗口中運(yùn)行發(fā)布者 3.4.1 觀察輸出

所有訂閱者都會(huì)收到發(fā)布者發(fā)送的消息。例如:

發(fā)布者輸出:

 [x] Sent 'Hello, Subscribers!'

訂閱者輸出:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello, Subscribers!'

4. 代碼解析

4.1 發(fā)布者代碼解析

  • 連接工廠ConnectionFactory 用于創(chuàng)建到 RabbitMQ 服務(wù)器的連接。
  • 交換機(jī)聲明channel.exchangeDeclare(EXCHANGE_NAME, "fanout") 聲明一個(gè) Fanout 交換機(jī)。
  • 消息發(fā)布channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)) 將消息發(fā)送到交換機(jī)。

4.2 訂閱者代碼解析

  • 臨時(shí)隊(duì)列channel.queueDeclare().getQueue() 創(chuàng)建一個(gè)非持久化的、獨(dú)占的臨時(shí)隊(duì)列。
  • 隊(duì)列綁定channel.queueBind(queueName, EXCHANGE_NAME, "") 將隊(duì)列綁定到交換機(jī)。
  • 消息處理DeliverCallback 定義了如何處理接收到的消息。
  • 消費(fèi)消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }) 開始消費(fèi)消息。

5. Publish/Subscribe 模式的適用場(chǎng)景

5.1 日志記錄

在分布式系統(tǒng)中,日志記錄是一個(gè)常見的需求。使用 Publish/Subscribe 模式,可以將日志消息廣播給多個(gè)日志處理器,分別將日志寫入文件、數(shù)據(jù)庫(kù)或發(fā)送到監(jiān)控系統(tǒng)。

5.2 實(shí)時(shí)通知

在社交網(wǎng)絡(luò)或即時(shí)通訊應(yīng)用中,可以使用 Publish/Subscribe 模式向多個(gè)用戶發(fā)送實(shí)時(shí)通知。例如,當(dāng)用戶發(fā)布新動(dòng)態(tài)時(shí),通知所有關(guān)注者。

5.3 分布式緩存更新

在分布式緩存系統(tǒng)中,當(dāng)緩存數(shù)據(jù)更新時(shí),可以使用 Publish/Subscribe 模式通知所有緩存節(jié)點(diǎn)同步更新。

5.4 事件驅(qū)動(dòng)架構(gòu)

在事件驅(qū)動(dòng)架構(gòu)中,Publish/Subscribe 模式用于實(shí)現(xiàn)事件的廣播。例如,當(dāng)用戶注冊(cè)成功時(shí),發(fā)布一個(gè)事件,通知多個(gè)服務(wù)(如郵件服務(wù)、積分服務(wù))執(zhí)行相應(yīng)的操作。

6. 最佳實(shí)踐

6.1 使用持久化

為了確保消息不會(huì)丟失,建議將交換機(jī)和隊(duì)列設(shè)置為持久化。例如:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
channel.queueDeclare("my_queue", true, false, false, null);

6.2 處理消息確認(rèn)

在生產(chǎn)環(huán)境中,建議啟用消息確認(rèn)機(jī)制,確保消息被成功消費(fèi)。例如:

channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });

6.3 避免消息積壓

在高并發(fā)場(chǎng)景下,可能會(huì)出現(xiàn)消息積壓的情況。可以通過設(shè)置隊(duì)列的最大長(zhǎng)度或使用**死信隊(duì)列(DLX)**來處理積壓的消息。

6.4 監(jiān)控和報(bào)警

使用 RabbitMQ 的管理界面或監(jiān)控工具(如 Prometheus + Grafana)監(jiān)控消息隊(duì)列的狀態(tài),并設(shè)置報(bào)警規(guī)則,及時(shí)發(fā)現(xiàn)和解決問題。

7. 總結(jié)

Publish/Subscribe 模式是 RabbitMQ 中一種強(qiáng)大且靈活的消息傳遞模式,適用于需要將消息廣播給多個(gè)訂閱者的場(chǎng)景。通過使用 Fanout 交換機(jī),可以輕松實(shí)現(xiàn)消息的廣播,同時(shí)結(jié)合持久化、消息確認(rèn)和監(jiān)控機(jī)制,可以構(gòu)建高可靠性的分布式系統(tǒng)。

到此這篇關(guān)于RabbitMQ中的Publish-Subscribe模式的文章就介紹到這了,更多相關(guān)RabbitMQ Publish-Subscribe模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot配置嵌入式Servlet容器和使用外置Servlet容器的教程圖解

    SpringBoot配置嵌入式Servlet容器和使用外置Servlet容器的教程圖解

    這篇文章主要介紹了SpringBoot配置嵌入式Servlet容器和使用外置Servlet容器的教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-07-07
  • java結(jié)合keytool如何實(shí)現(xiàn)非對(duì)稱簽名和驗(yàn)證詳解

    java結(jié)合keytool如何實(shí)現(xiàn)非對(duì)稱簽名和驗(yàn)證詳解

    這篇文章主要給大家介紹了關(guān)于java結(jié)合keytool如何實(shí)現(xiàn)非對(duì)稱簽名和驗(yàn)證的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-08-08
  • 老生常談java中的fail-fast機(jī)制

    老生常談java中的fail-fast機(jī)制

    下面小編就為大家?guī)硪黄仙U刯ava中的fail-fast機(jī)制。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-08-08
  • Java8中的LocalDateTime和Date一些時(shí)間操作方法

    Java8中的LocalDateTime和Date一些時(shí)間操作方法

    這篇文章主要介紹了Java8中的LocalDateTime和Date一些時(shí)間操作方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-04-04
  • Java多線程下載文件實(shí)例詳解

    Java多線程下載文件實(shí)例詳解

    這篇文章主要為大家詳細(xì)介紹了Java多線程下載文件的實(shí)例代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • 用Set類判斷Map里key是否存在的示例代碼

    用Set類判斷Map里key是否存在的示例代碼

    本篇文章主要是對(duì)用Set類判斷Map里key是否存在的示例代碼進(jìn)行了介紹,需要的朋友可以過來參考下,希望對(duì)大家有所幫助
    2013-12-12
  • idea取消git托管方式(刪除git文件)

    idea取消git托管方式(刪除git文件)

    遇到Git文件傳輸錯(cuò)誤或打不開問題時(shí),需進(jìn)行Git清理和重新配置,首先刪除項(xiàng)目中的.git文件和.gitignore文件,若找不到,檢查是否為隱藏文件,接著在IDE的設(shè)置中,刪除所有版本控制模塊,最后,若想重新使用Git,可在設(shè)置里重新啟用并配置,連接至GitHub倉(cāng)庫(kù)即可恢復(fù)正常
    2024-10-10
  • 使用 Apache POI 在 Java 中寫入 Excel 文件的方法

    使用 Apache POI 在 Java 中寫入 Excel

    這篇文章詳細(xì)介紹了如何使用ApachePOI在Java中編寫Excel文件的技巧,包括創(chuàng)建工作簿、工作表、行和單元格,以及如何處理不同版本的Excel文件,通過詳細(xì)的步驟和代碼示例,讀者可以快速掌握ApachePOI的基本使用方法,感興趣的朋友一起看看吧
    2025-02-02
  • Java日期工具類時(shí)間校驗(yàn)實(shí)現(xiàn)

    Java日期工具類時(shí)間校驗(yàn)實(shí)現(xiàn)

    一般項(xiàng)目中需要對(duì)入?yún)⑦M(jìn)行校驗(yàn),比如必須是一個(gè)合法的日期,本文就來介紹一下Java日期工具類時(shí)間校驗(yàn)實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-12-12
  • SpringBoot讀寫xml上傳到AWS存儲(chǔ)服務(wù)S3的示例

    SpringBoot讀寫xml上傳到AWS存儲(chǔ)服務(wù)S3的示例

    這篇文章主要介紹了SpringBoot讀寫xml上傳到S3的示例,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下
    2020-10-10

最新評(píng)論