亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Docker啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的詳細(xì)過程

 更新時(shí)間:2023年02月23日 10:08:26   作者:zoeil  
這篇文章主要介紹了Docker啟動(dòng)RabbitMQ,實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者,通過Docker拉取鏡像并啟動(dòng)RabbitMQ,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

一、Docker拉取鏡像并啟動(dòng)RabbitMQ

拉取鏡像

docker pull rabbitmq:3.8.8-management

查看鏡像

docker images rabbitmq

 啟動(dòng)鏡像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虛擬機(jī)記得開放5672端口或者關(guān)閉防火墻,在window通過 主機(jī)ip:15672 訪問rabbitmq控制臺(tái)

 用戶名密碼默認(rèn)為guest

二、Hello World

(一)依賴導(dǎo)入

<!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依賴客戶端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一個(gè)依賴-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

(二)消息生產(chǎn)者

工作原理

  • Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker
  • Connection:publisher/consumer 和 broker 之間的 TCP 連接
  • Channel:如果每一次訪問 RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection 的開銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè) thread 創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開銷
  • Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最終被送到這里等待 consumer 取走

我們需要先獲取連接(Connection),然后通過連接獲取信道(Channel),這里我們演示簡(jiǎn)單例子,可以直接跳過交換機(jī)(Exchange)發(fā)送隊(duì)列(Queue)

public class Producer {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機(jī)ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
        /*
         * 生成一個(gè)隊(duì)列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
         * 1.隊(duì)列名稱
         * 2.隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中
         * 3.該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi)
         * 4.是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除
         * 5.其他參數(shù)
         **/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello rabbitmq";
        /*
         * 發(fā)送一個(gè)消息
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.發(fā)送到哪個(gè)交換機(jī)
         * 2.路由的key是哪個(gè)
         * 3.其他的參數(shù)信息
         * 4.發(fā)送消息的消息體
         *
         **/
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("發(fā)送成功");
    }
}

(三)消息消費(fèi)者

public class Consumer {
 
    private static final String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機(jī)ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
 
        // 推送的消息如何進(jìn)行消費(fèi)的回調(diào)接口
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消費(fèi)的一個(gè)回調(diào)接口,如在消費(fèi)的時(shí)候隊(duì)列被刪除了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消費(fèi)被中斷");
        };
        /*
         * 消費(fèi)者消費(fèi)消息
         * basicConsume(String queue, boolean autoAck, 
         * DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * 1.消費(fèi)哪個(gè)隊(duì)列
         * 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答
         * 3.消費(fèi)者未成功消費(fèi)的回調(diào)
         **/
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

三、實(shí)現(xiàn)輪訓(xùn)分發(fā)消息

(一)抽取工具類

可以發(fā)現(xiàn),上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個(gè)工具類來(lái)調(diào)用,并使用單例模式-餓漢式完成信道的初始化

public class RabbitMqUtils {
 
    private static Channel channel;
 
    static {
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置ip地址
        factory.setHost("192.168.23.100");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        try {
            // 創(chuàng)建連接
            Connection connection = factory.newConnection();
            // 獲取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("創(chuàng)建信道失敗,錯(cuò)誤信息:" + e.getMessage());
        }
    }
 
    public static Channel getChannel() {
        return channel;
    }
}

(二)啟動(dòng)兩個(gè)工作線程

相當(dāng)于前面的消費(fèi)者,我們只需要寫一個(gè)類,通過ideal實(shí)現(xiàn)多線程啟動(dòng)即可模擬兩個(gè)線程

public class Worker01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            System.out.println("接受到消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        // 啟動(dòng)兩次,第一次為C1, 第二次為C2
        System.out.println("C2消費(fèi)者等待消費(fèi)消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
    }
}

(三)啟動(dòng)發(fā)送線程

public class Test01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 通過控制臺(tái)輸入充當(dāng)消息,使輪訓(xùn)演示更明顯
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

結(jié)果 

四、實(shí)現(xiàn)手動(dòng)應(yīng)答

(一)消息應(yīng)答概念

消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成 了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消 息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù) 發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。 為了保證消息在發(fā)送過程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi)者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。

自動(dòng)應(yīng)答:消費(fèi)者發(fā)送后立即被認(rèn)為已經(jīng)傳送成功。這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán),因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。

當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過載的消息, 沒有對(duì)傳遞的消息數(shù)量進(jìn)行限制 , 當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終 使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并 以某種速率能夠處理這些消息的情況下使用 。

手動(dòng)應(yīng)答:消費(fèi)者接受到消息并順利完成業(yè)務(wù)后再調(diào)用方法進(jìn)行確認(rèn),rabbitmq 才可以把該消息刪除

(二)消息應(yīng)答的方法

  • Channel.basicAck(用于肯定確認(rèn))
  • RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
  • Channel.basicNack(用于否定確認(rèn))
  • Channel.basicReject(用于否定確認(rèn))
  • 與 Channel.basicNack 相比少一個(gè)參數(shù)Multiple
  • multiple 的 true 和 false 代表不同意思

        true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
        比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)
        5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答
        false 同上面相比
        只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答

  • 不處理該消息了直接拒絕,可以將其丟棄了

(三)消息自動(dòng)重新入隊(duì) 

如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確 保不會(huì)丟失任何消息。

(四)消息手動(dòng)應(yīng)答代碼 

1、生產(chǎn)者

public class Test01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

2、睡眠工具類模擬業(yè)務(wù)執(zhí)行

public class SleepUtils {
 
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3、消費(fèi)者

public class Worker01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C1,業(yè)務(wù)時(shí)間短");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(1);  // 模擬業(yè)務(wù)執(zhí)行1秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識(shí)
             * 2、是否啟動(dòng)批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
             *    時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}
 
==============================================================================
public class Worker02 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C2,業(yè)務(wù)時(shí)間長(zhǎng)");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(15);  // 模擬業(yè)務(wù)執(zhí)行15秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識(shí)
             * 2、是否啟動(dòng)批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
             *    時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

worker01業(yè)務(wù)時(shí)間短,worker02業(yè)務(wù)時(shí)間長(zhǎng),我們提前終止worker02模擬出異常,可以看到消息dd會(huì)被放回隊(duì)列由worker01接收處理。

注意:這里需要先啟動(dòng)生產(chǎn)者聲明隊(duì)列ack,不然啟動(dòng)消費(fèi)者會(huì)報(bào)錯(cuò)

最后一個(gè)案例我們可以看到消息輪訓(xùn)+消息自動(dòng)重新入隊(duì)+手動(dòng)應(yīng)答。

到此這篇關(guān)于Docker啟動(dòng)RabbitMQ,實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的文章就介紹到這了,更多相關(guān)Docker啟動(dòng)RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 如何使用Docker和cpolar在Linux服務(wù)器上搭建DashDot監(jiān)控面板

    如何使用Docker和cpolar在Linux服務(wù)器上搭建DashDot監(jiān)控面板

    本文主要介紹如何在Linux服務(wù)器上使用Docker和cpolar技術(shù)搭建DashDot監(jiān)控面板,實(shí)現(xiàn)實(shí)時(shí)服務(wù)器監(jiān)控,DashDot提供直觀的監(jiān)控界面和豐富的指標(biāo),通過cpolar可以實(shí)現(xiàn)公網(wǎng)訪問,方便用戶隨時(shí)了解服務(wù)器狀態(tài),文章詳細(xì)說(shuō)明了環(huán)境準(zhǔn)備、安裝Docker、配置DashDot和cpolar的步驟
    2024-09-09
  • 阿里云鏡像安裝docker報(bào)錯(cuò)的問題及解決方案

    阿里云鏡像安裝docker報(bào)錯(cuò)的問題及解決方案

    這篇文章主要介紹了阿里云鏡像安裝docker報(bào)錯(cuò)的問題及解決方案,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-08-08
  • docker創(chuàng)建redis鏡像的方法

    docker創(chuàng)建redis鏡像的方法

    本篇文章主要介紹了docker創(chuàng)建redis鏡像的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來(lái)看看吧
    2017-12-12
  • 淺談Windows平臺(tái)上Docker安裝與使用

    淺談Windows平臺(tái)上Docker安裝與使用

    本篇文章主要介紹了淺談Windows平臺(tái)上Docker安裝與使用,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來(lái)看看吧
    2017-12-12
  • docker-compose部署zk+kafka+storm集群的實(shí)現(xiàn)

    docker-compose部署zk+kafka+storm集群的實(shí)現(xiàn)

    這篇文章主要介紹了docker-compose部署zk+kafka+storm集群,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • linux 詳解useradd 命令基本用法

    linux 詳解useradd 命令基本用法

    這篇文章主要介紹了linux 詳解useradd 命令基本用法的相關(guān)資料,需要的朋友可以參考下
    2017-01-01
  • docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法

    docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法

    這篇文章主要介紹了docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-08-08
  • 第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解

    第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解

    今天小編就為大家分享一篇關(guān)于第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-03-03
  • MySQL容器中docker-entrypoint-initdb.d目錄的使用

    MySQL容器中docker-entrypoint-initdb.d目錄的使用

    這篇文章主要介紹了MySQL容器中docker-entrypoint-initdb.d目錄的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • docker-compose的安裝和使用詳解

    docker-compose的安裝和使用詳解

    這篇文章主要介紹了docker-compose的安裝和使用詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11

最新評(píng)論