亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

關(guān)于Java中RabbitMQ的高級(jí)特性

 更新時(shí)間:2023年07月10日 11:16:10   作者:卑微小鐘  
這篇文章主要介紹了關(guān)于Java中RabbitMQ的高級(jí)特性,MQ全稱為Message Queue,即消息隊(duì)列,"消息隊(duì)列"是在消息的傳輸過(guò)程中保存消息的容器,它是典型的:生產(chǎn)者、消費(fèi)者模型,生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息,需要的朋友可以參考下

RabbitMQ高級(jí)特性

1.消息的可靠投遞

在使用RabbitMQ的時(shí)候,作為消息發(fā)送方希望杜絕任何消息丟失或投遞失敗場(chǎng)景。RabbitMQ為我們提供了兩種方式來(lái)控制消息的投遞可靠性模式。

  • confirm 確認(rèn)模式
  • return 退回模式

RabbitMQ整個(gè)消息投遞的路徑為:producer>rabbitMQ broker> exchange > queue > consumer

  • 消息從producer到exchange則會(huì)返回一個(gè)confirmCallback
  • 消息從exchange到queue投遞失敗則會(huì)返回一個(gè)returnCallback

利用這兩個(gè)callback來(lái)控制消息的可靠性傳遞。

1.1 confirm 確認(rèn)模式

(1)開(kāi)啟確認(rèn)模式

在創(chuàng)建連接工廠的時(shí)候要開(kāi)啟確認(rèn)模式,關(guān)鍵字:publisher-confirms,默認(rèn)為false。

<rabbit:connection-factory id="connectionFactory" 
                           host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.username}"
                           password="${rabbitmq.password}"
                           virtual-host="${rabbitmq.virtual-host}"
                           publisher-confirms="true"
/>

(2)RabbitTemplate設(shè)置回調(diào)

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    /**
     * 注入RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 測(cè)試默認(rèn)的隊(duì)列發(fā)送消息
     */
    @Test
    public void testConfirmCallback() throws InterruptedException {
        // 設(shè)置回調(diào)
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 回調(diào)方法
             * @param correlationData 回調(diào)的相關(guān)數(shù)據(jù)。
             * @param ack true 表示發(fā)送成功, false 發(fā)送失敗
             * @param cause 失敗原因,ack==true->null
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("發(fā)送成功");
                } else {
                    System.out.println("發(fā)送失敗,原因:" + cause);
                    // 失敗后處理流程
                }
            }
        });
        rabbitTemplate.convertAndSend("spring_queue", "hello world");
        // 防止發(fā)送完成后,未完成回調(diào)關(guān)閉通道
        Thread.sleep(5000);
    }
}
  • public void confirm(CorrelationData correlationData, boolean ack, String cause)

    • correlationData 參數(shù),發(fā)送數(shù)據(jù)的時(shí)候可以攜帶上
    • ack 是否發(fā)送成功,成功為true,失敗為false
    • cause 失敗的原因,成功時(shí)為null
  • Thread.sleep(5000);防止發(fā)送完成后,未完成回調(diào)關(guān)閉通道

    如果沒(méi)有加上會(huì)

    clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)

1.2 return 回退模式

(1)開(kāi)啟回退模式

<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.username}"
                           password="${rabbitmq.password}"
                           virtual-host="${rabbitmq.virtual-host}"
                           publisher-returns="true"
/>

(2)RabbitTemplate設(shè)置回調(diào)

@Test
    public void testReturnCallback() throws InterruptedException {
        // 設(shè)置交換機(jī)處理失敗消息的模式
        rabbitTemplate.setMandatory(true);
        // 設(shè)置回調(diào)
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 返回消息
             * @param message 消息對(duì)象
             * @param replyCode 錯(cuò)誤碼
             * @param replyText 交換信息
             * @param exchange 交換機(jī)
             * @param routingKey 路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("消息對(duì)象:" + new String(message.getBody()));
                System.out.println("錯(cuò)誤碼:" + replyCode);
                System.out.println("交換信息:" + replyText);
                System.out.println("交換機(jī):" + exchange);
                System.out.println("路由鍵:" + routingKey);
            }
        });
        rabbitTemplate.convertAndSend("spring_direct_exchange", "direct_key_3",
                "spring_direct_exchange_direct_key_1");
        // 防止發(fā)送完成后,未完成回調(diào)關(guān)閉通道
        Thread.sleep(5000);
    }

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

  • message 消息對(duì)象
  • replyCode 錯(cuò)誤碼
  • replyText 交換信息
  • exchange 交換機(jī)
  • routingKey 路由鍵

mandatory屬性的優(yōu)先級(jí)高于publisher-returns的優(yōu)先級(jí)
mandatory結(jié)果為true、false時(shí)會(huì)忽略掉publisher-returns屬性的值
mandatory結(jié)果為null(即不配置)時(shí)結(jié)果由publisher-returns確定

2.Consumer Ack(消費(fèi)端)

Ack指Acknowledge,確認(rèn)。表示消費(fèi)端接收到消息后的確認(rèn)方式。

有三種確認(rèn)方式:

  • 自動(dòng)確認(rèn):acknowledge="none"
  • 手動(dòng)確認(rèn):acknowledge="manual"
  • 根據(jù)異常情況確認(rèn):acknowledge="auto"

其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng)message 從RabbitMQ的消息緩存中移除。

但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用``channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()`方法,讓其自動(dòng)重新發(fā)送消息。

2.1 設(shè)置手動(dòng)簽收

(1)創(chuàng)建一個(gè)監(jiān)聽(tīng)器接收消息

設(shè)置手動(dòng)接收時(shí),讓監(jiān)聽(tīng)器實(shí)現(xiàn)ChannelAwareMessageListener接口

如果消息成功處理,則調(diào)用channel.basicAck()

如果消息處理失敗,則調(diào)用 channel.basicNack(),broker重新發(fā)送consumer

/**
 * @author zhong
 * <p>
 * Consumer Ack機(jī)制
 * 1.設(shè)置手動(dòng)簽收,acknowledge="manual"
 * 2.讓監(jiān)聽(tīng)器實(shí)現(xiàn)ChannelAwareMessageListener接口
 * 3.如果消息成功處理,則調(diào)用channel.basicAck()
 * 4.如果消息處理失敗,則調(diào)用 channel.basicNack(),broker重新發(fā)送consumer
 */
@Component
public class AckSpringQueueListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 接收消息
        System.out.println("Message:" + new String(message.getBody()));
        // 手動(dòng)簽收
        /**
         * deliveryTag: 標(biāo)識(shí)id
         * multiple: 確認(rèn)所有消息
         */
        channel.basicAck(deliveryTag, true);
        // 手動(dòng)拒絕
        /**
         * requeue:如果被拒絕的消息應(yīng)該被重新排隊(duì)而不是被丟棄/死信
         */
        //channel.basicNack(deliveryTag, true, true);
    }
}

(2)設(shè)置手動(dòng),加入監(jiān)聽(tīng)

設(shè)置手動(dòng)簽收,acknowledge=“manual”

<context:component-scan base-package="org.example"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" auto-declare="true">
    <rabbit:listener ref="ackSpringQueueListener" queue-names="spring_queue"/>
</rabbit:listener-container>

3.消費(fèi)端限流

MQ一個(gè)作用就是削峰填谷,通過(guò)消費(fèi)端限流實(shí)現(xiàn)。

消費(fèi)端限流包括一下操作:

  • <rabbit:listener-container>配置prefetch???????屬性設(shè)置
  • 消費(fèi)端一次拉去多少消息消費(fèi)端確認(rèn)模式一定為手動(dòng)確認(rèn)。acknowledge="nanual"

(1)關(guān)鍵配置文件:

<context:component-scan base-package="org.example"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"
                           auto-declare="true">
    <rabbit:listener ref="qosListener" queue-names="spring_queue"/>
</rabbit:listener-container>

(1)手動(dòng)確認(rèn) acknowledge="manual"

(2)設(shè)置閾值 prefetch="1"

(2)關(guān)鍵監(jiān)聽(tīng)器代碼

/**
 * Consumer 限流機(jī)制
 * 1.確保ack機(jī)制為手動(dòng)確認(rèn)
 * 2.listener-container 配置屬性
 * perfetch = 1 表示消費(fèi)端每次從mq拉取一條消息來(lái)消費(fèi),直到手動(dòng)確認(rèn)消費(fèi)完畢后,才會(huì)繼續(xù)拉去下一條消息。
 */
@Component
public class QosListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("QosListener:" + new String(message.getBody()));
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 簽收消息
        Thread.sleep(1000);
        channel.basicAck(deliveryTag, true);
    }
}

4.TTL(存活時(shí)間/過(guò)期時(shí)間)

TTL全稱Time To Live (存活時(shí)間/過(guò)期時(shí)間)。

  • 當(dāng)消息到達(dá)存活時(shí)間后,還沒(méi)有被消費(fèi),會(huì)被自動(dòng)清除。
  • RabbitMQ可以對(duì)消息設(shè)置過(guò)期時(shí)間,也可以對(duì)整個(gè)隊(duì)列(Queue)設(shè)置過(guò)期時(shí)間。

4.1 控制臺(tái)設(shè)置

RabbitMQ控制臺(tái)可以設(shè)置隊(duì)列的過(guò)期時(shí)間。

4.2 消息單獨(dú)過(guò)期

@Test
public void testTTL() {
    // 消息后處理隊(duì)列,設(shè)置一下消息參數(shù)信息
    MessagePostProcessor messagePostProcessor = message -> {
        // 1.設(shè)置message的消息
        message.getMessageProperties().setExpiration("50000");// 設(shè)置過(guò)期時(shí)間,字符串,毫秒
        // 2.返回消息
        return message;
    };
    // 傳入
    rabbitTemplate.convertAndSend("spring_fanout_exchange", "key", "RabbitMQ", messagePostProcessor);
}

4.3 小結(jié)

如果設(shè)置了消息的過(guò)期時(shí)間,也設(shè)置了隊(duì)列的過(guò)期時(shí)間,它以時(shí)間短的為準(zhǔn)。隊(duì)列過(guò)期后,會(huì)將隊(duì)列所有消息全部移除。消息過(guò)期后,只有消息在隊(duì)列頂端,才會(huì)判斷其是否過(guò)期(移除)。

5.死信隊(duì)列

死信隊(duì)列,英文縮寫(xiě):DLX。Dead Letter Exchange(死信交換機(jī))

當(dāng)消息成為Dead Message后,可以被重新發(fā)送到另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX。

到此這篇關(guān)于關(guān)于Java中RabbitMQ的高級(jí)特性 的文章就介紹到這了,更多相關(guān)RabbitMQ的高級(jí)特性 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python OpenCV圖像復(fù)原的實(shí)現(xiàn)步驟

    Python OpenCV圖像復(fù)原的實(shí)現(xiàn)步驟

    Python OpenCV圖像復(fù)原是一個(gè)涉及去除噪聲、模糊等失真的過(guò)程,旨在恢復(fù)圖像的原始質(zhì)量,以下是一個(gè)詳細(xì)的案例教程,包括理論背景和具體實(shí)現(xiàn)步驟,需要的朋友可以參考下
    2024-12-12
  • 淺談Python協(xié)程asyncio

    淺談Python協(xié)程asyncio

    今天給大家?guī)?lái)的是關(guān)于Python的相關(guān)知識(shí),文章圍繞著Python協(xié)程展開(kāi),文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下
    2021-06-06
  • python高階函數(shù)functools模塊的具體使用

    python高階函數(shù)functools模塊的具體使用

    本文主要介紹了python高階函數(shù)functools模塊的具體使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 新建文件時(shí)Pycharm中自動(dòng)設(shè)置頭部模板信息的方法

    新建文件時(shí)Pycharm中自動(dòng)設(shè)置頭部模板信息的方法

    這篇文章主要介紹了新建文件時(shí)Pycharm中自動(dòng)設(shè)置頭部模板信息的方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-04-04
  • python代碼過(guò)長(zhǎng)的換行方法

    python代碼過(guò)長(zhǎng)的換行方法

    今天小編就為大家分享一篇python代碼過(guò)長(zhǎng)的換行方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2018-07-07
  • Python之多線程退出與停止的一種實(shí)現(xiàn)思路

    Python之多線程退出與停止的一種實(shí)現(xiàn)思路

    這篇文章主要介紹了Python之多線程退出與停止的一種實(shí)現(xiàn)思路,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • python的迭代器與生成器實(shí)例詳解

    python的迭代器與生成器實(shí)例詳解

    這篇文章主要介紹了python的迭代器與生成器實(shí)例詳解,需要的朋友可以參考下
    2014-07-07
  • Python中enumerate()函數(shù)編寫(xiě)更Pythonic的循環(huán)

    Python中enumerate()函數(shù)編寫(xiě)更Pythonic的循環(huán)

    本篇文章主要大家通過(guò)實(shí)例講述了Python中enumerate()函數(shù)編寫(xiě)更Pythonic的循環(huán)的知識(shí)點(diǎn),有興趣的朋友參考學(xué)習(xí)下。
    2018-03-03
  • 安裝pytorch報(bào)錯(cuò)torch.cuda.is_available()=false問(wèn)題的解決過(guò)程

    安裝pytorch報(bào)錯(cuò)torch.cuda.is_available()=false問(wèn)題的解決過(guò)程

    最近想用pytorch,因此裝了pytorch,但是碰到了問(wèn)題,下面這篇文章主要給大家介紹了關(guān)于安裝pytorch報(bào)錯(cuò)torch.cuda.is_available()=false問(wèn)題的解決過(guò)程,需要的朋友可以參考下
    2022-05-05
  • python讀取Kafka實(shí)例

    python讀取Kafka實(shí)例

    今天小編就為大家分享一篇python讀取Kafka實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-12-12

最新評(píng)論