亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

使用Java實(shí)現(xiàn)RabbitMQ延時(shí)隊(duì)列

 更新時(shí)間:2023年06月20日 14:27:21   作者:土豆魚_  
RabbitMQ?延時(shí)隊(duì)列是指消息在發(fā)送到隊(duì)列后,并不立即被消費(fèi)者消費(fèi),而是等待一段時(shí)間后再被消費(fèi)者消費(fèi),本文為大家介紹了實(shí)現(xiàn)RabbitMQ延時(shí)隊(duì)列的Java代碼,希望對大家有所幫助

RabbitMQ 延時(shí)隊(duì)列介紹

RabbitMQ 延時(shí)隊(duì)列是指消息在發(fā)送到隊(duì)列后,并不立即被消費(fèi)者消費(fèi),而是等待一段時(shí)間后再被消費(fèi)者消費(fèi)。這種隊(duì)列通常用于實(shí)現(xiàn)定時(shí)任務(wù),例如,訂單超時(shí)未支付系統(tǒng)取消訂單釋放所占庫存等。

RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的方法有多種,其中比較常見的是使用插件或者通過DLX(Dead Letter Exchange)機(jī)制實(shí)現(xiàn)。

1.使用插件實(shí)現(xiàn)延時(shí)隊(duì)列

RabbitMQ提供了rabbitmq_delayed_message_exchange插件,可以通過該插件實(shí)現(xiàn)延時(shí)隊(duì)列。該插件的原理是在消息發(fā)送時(shí),將消息發(fā)送到一個(gè)特定的Exchange中,然后該Exchange會根據(jù)消息中的延時(shí)時(shí)間將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,從而實(shí)現(xiàn)延時(shí)隊(duì)列的功能

使用該插件需要先安裝插件,然后創(chuàng)建一個(gè)Exchange,并將該Exchange的類型設(shè)置為x-delayed-message,然后將該Exchange與隊(duì)列綁定即可。

2.使用DLX機(jī)制實(shí)現(xiàn)延時(shí)隊(duì)列

消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對隊(duì)列和消息分別設(shè)置TTL。而對隊(duì)列設(shè)置就是隊(duì)列沒有消費(fèi)者連著的保留時(shí)間,也可以對每一個(gè)單獨(dú)的消息做單獨(dú)的 設(shè)置。超過了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會取小的。所以一個(gè)消息如果被路由到不同的隊(duì) 列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。可以通過設(shè)置消息的expiration字段或者x- message-ttl屬性來設(shè)置時(shí)間,兩者是一樣的效果

DLX機(jī)制是RabbitMQ提供的一種消息轉(zhuǎn)發(fā)機(jī)制,它可以將無法被處理的消息轉(zhuǎn)發(fā)到指定的Exchange中,從而實(shí)現(xiàn)消息的延時(shí)處理。具體實(shí)現(xiàn)步驟如下:

  • 創(chuàng)建一個(gè)普通的Exchange和Queue,并將它們綁定在一起。
  • 創(chuàng)建一個(gè)DLX Exchange,并將普通Exchange綁定到該DLX Exchange上。
  • 將Queue設(shè)置為具有TTL(Time To Live)屬性,并設(shè)置消息過期時(shí)間。
  • 將Queue綁定到DLX Exchange上。

當(dāng)消息過期后,會被發(fā)送到DLX Exchange中,然后再由DLX Exchange將消息轉(zhuǎn)發(fā)到指定的Exchange中,從而實(shí)現(xiàn)延時(shí)隊(duì)列的功能。

使用DLX機(jī)制實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)點(diǎn)是不需要安裝額外的插件,但是需要對消息的過期時(shí)間進(jìn)行精確控制,否則可能會出現(xiàn)消息過期時(shí)間不準(zhǔn)確的情況。

Java語言設(shè)置延時(shí)隊(duì)列

下面是使用 Java 語言通過 RabbitMQ 設(shè)置延時(shí)隊(duì)列的步驟:

1.安裝插件

首先,需要安裝 rabbitmq_delayed_message_exchange 插件??梢酝ㄟ^以下命令安裝:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2. 創(chuàng)建延時(shí)交換機(jī)

延時(shí)隊(duì)列需要使用延時(shí)交換機(jī)??梢允褂?x-delayed-message 類型創(chuàng)建一個(gè)延時(shí)交換機(jī)。以下是創(chuàng)建延時(shí)交換機(jī)的示例代碼:

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);

3.創(chuàng)建延時(shí)隊(duì)列

創(chuàng)建延時(shí)隊(duì)列時(shí),需要將隊(duì)列綁定到延時(shí)交換機(jī)上,并設(shè)置隊(duì)列的 TTL(Time To Live)參數(shù)。以下是創(chuàng)建延時(shí)隊(duì)列的示例代碼:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "delayed-exchange");
args.put("x-dead-letter-routing-key", "delayed-queue");
args.put("x-message-ttl", 5000);
channel.queueDeclare("delayed-queue", true, false, false, args);
channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");

在上述代碼中,將隊(duì)列綁定到延時(shí)交換機(jī)上,并設(shè)置了隊(duì)列的 TTL 參數(shù)為 5000 毫秒,即消息在發(fā)送到隊(duì)列后,如果在 5000 毫秒內(nèi)沒有被消費(fèi)者消費(fèi),則會被轉(zhuǎn)發(fā)到 delayed-exchange 交換機(jī)上,并發(fā)送到 delayed-queue 隊(duì)列中。

4.發(fā)送延時(shí)消息

發(fā)送延時(shí)消息時(shí),需要設(shè)置消息的 expiration 屬性,該屬性表示消息的過期時(shí)間。以下是發(fā)送延時(shí)消息的示例代碼:

Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .headers(headers)
        .expiration("5000")
        .build();
channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());

在上述代碼中,設(shè)置了消息的 expiration 屬性為 5000 毫秒,并將消息發(fā)送到 delayed-exchange 交換機(jī)上,路由鍵為 delayed-queue,消息內(nèi)容為 "Hello, delayed queue!"。

5.消費(fèi)延時(shí)消息

消費(fèi)延時(shí)消息時(shí),需要設(shè)置消費(fèi)者的 QOS(Quality of Service)參數(shù),以控制消費(fèi)者的并發(fā)處理能力。以下是消費(fèi)延時(shí)消息的示例代碼:

channel.basicQos(1);
channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println("Received message: " + message);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});

在上述代碼中,設(shè)置了 QOS 參數(shù)為 1,即每次只處理一個(gè)消息。然后使用 basicConsume 方法消費(fèi) delayed-queue 隊(duì)列中的消息,并在消費(fèi)完成后,使用 basicAck 方法確認(rèn)消息已被消費(fèi)。

通過上述步驟,就可以實(shí)現(xiàn) RabbitMQ 延時(shí)隊(duì)列,用于實(shí)現(xiàn)定時(shí)任務(wù)等功能。

RabbitMQ延時(shí)隊(duì)列是一種常見的消息隊(duì)列應(yīng)用場景,它可以在消息發(fā)送后指定一定的時(shí)間后才能被消費(fèi)者消費(fèi),通常用于實(shí)現(xiàn)一些延時(shí)任務(wù),例如訂單超時(shí)未支付自動(dòng)取消等。

RabbitMQ延時(shí)隊(duì)列具體代碼

下面是具體代碼(附注釋):

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DelayedQueueExample {
    private static final String EXCHANGE_NAME = "delayed_exchange";
    private static final String QUEUE_NAME = "delayed_queue";
    private static final String ROUTING_KEY = "delayed_routing_key";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /*
         Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;
                                              */
        // 創(chuàng)建一個(gè)支持延時(shí)隊(duì)列的Exchange
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
        // 創(chuàng)建一個(gè)延時(shí)隊(duì)列,設(shè)置x-dead-letter-exchange和x-dead-letter-routing-key參數(shù)
        Map<String, Object> queueArguments = new HashMap<>();
        queueArguments.put("x-dead-letter-exchange", "");
        queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
        queueArguments.put("x-message-ttl", 5000);
        channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        // 發(fā)送消息到延時(shí)隊(duì)列中,設(shè)置expiration參數(shù)
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("10000")
                .build();
        String message = "Hello, delayed queue!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
        System.out.println("Sent message to delayed queue: " + message);
        channel.close();
        connection.close();
    }
}

在上面的代碼中,我們創(chuàng)建了一個(gè)支持延時(shí)隊(duì)列的Exchange,并創(chuàng)建了一個(gè)延時(shí)隊(duì)列,設(shè)置了x-dead-letter-exchange和x-dead-letter-routing-key參數(shù)。然后,我們發(fā)送了一條消息到延時(shí)隊(duì)列中,設(shè)置了expiration參數(shù),表示這條消息延時(shí)10秒后才能被消費(fèi)。

注意,如果我們想要消費(fèi)延時(shí)隊(duì)列中的消息,需要?jiǎng)?chuàng)建一個(gè)消費(fèi)者,并監(jiān)聽這個(gè)隊(duì)列。當(dāng)消息被消費(fèi)時(shí),需要發(fā)送ack確認(rèn)消息已經(jīng)被消費(fèi),否則消息會一直留在隊(duì)列中。

到此這篇關(guān)于使用Java實(shí)現(xiàn)RabbitMQ延時(shí)隊(duì)列的文章就介紹到這了,更多相關(guān)Java RabbitMQ延時(shí)隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • MyBatis-Plus不使用數(shù)據(jù)庫默認(rèn)值的問題及解決

    MyBatis-Plus不使用數(shù)據(jù)庫默認(rèn)值的問題及解決

    這篇文章主要介紹了MyBatis-Plus不使用數(shù)據(jù)庫默認(rèn)值的問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • Maven安裝和配置阿里云鏡像(解決加載慢)

    Maven安裝和配置阿里云鏡像(解決加載慢)

    本文介紹了Maven中配置阿里云鏡像以解決依賴加載慢的問題,包括安裝Maven、配置環(huán)境變量、在IDEA中配置Maven等內(nèi)容,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-11-11
  • Java將字符串String轉(zhuǎn)換為整型Int的兩種方式

    Java將字符串String轉(zhuǎn)換為整型Int的兩種方式

    這篇文章主要介紹了Java如何將字符串String轉(zhuǎn)換為整型Int,在 Java 中要將 String 類型轉(zhuǎn)化為 int 類型時(shí),需要使用 Integer 類中的 parseInt() 方法或者 valueOf() 方法進(jìn)行轉(zhuǎn)換,本文通過實(shí)例代碼給大家詳細(xì)講解,需要的朋友可以參考下
    2023-04-04
  • 一文梳理Java?8后的新功能

    一文梳理Java?8后的新功能

    Java 8是Java自Java 5(發(fā)布于2004年)之后的最重要的版本,下面這篇文章主要給大家介紹了關(guān)于Java8后新功能的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-02-02
  • java面試常見模式問題---代理模式

    java面試常見模式問題---代理模式

    代理模式是常用的java設(shè)計(jì)模式,他的特征是代理類與委托類有同樣的接口,代理類主要負(fù)責(zé)為委托類預(yù)處理消息、過濾消息、把消息轉(zhuǎn)發(fā)給委托類,以及事后處理消息
    2021-06-06
  • Maven分模塊開發(fā)執(zhí)行指令失敗的問題

    Maven分模塊開發(fā)執(zhí)行指令失敗的問題

    Maven分模塊開發(fā),行指令失敗,modules.module[3]‘ specifies duplicate child module maven_dao @ line 29, column 1的問題,本文給大家分享解決方法,感興趣的朋友跟隨小編一起看看吧
    2020-09-09
  • Spring HandlerInterceptor實(shí)現(xiàn)原理代碼解析

    Spring HandlerInterceptor實(shí)現(xiàn)原理代碼解析

    這篇文章主要介紹了Spring HandlerInterceptor實(shí)現(xiàn)原理代碼解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-10-10
  • RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解

    RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解

    很多場景下我們都需要延遲隊(duì)列。這篇文章主要以RabbitMQ為例來和大家聊一聊延遲隊(duì)列的玩法。文中的代碼具有一定的學(xué)習(xí)價(jià)值,感興趣的同學(xué)可以了解一下
    2021-12-12
  • Failed to execute goal org...的解決辦法

    Failed to execute goal org...的解決辦法

    這篇文章主要介紹了Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1的解決辦法的相關(guān)資料,需要的朋友可以參考下
    2017-06-06
  • java中InputStream獲取字節(jié)大小相關(guān)方法詳解

    java中InputStream獲取字節(jié)大小相關(guān)方法詳解

    這篇文章主要給大家介紹了關(guān)于java中InputStream獲取字節(jié)大小相關(guān)方法的相關(guān)資料,在Java中要實(shí)現(xiàn)讀取文件大小,可以使用InputStream來讀取文件的內(nèi)容,并通過獲取讀取的字節(jié)數(shù)來得到文件的大小,需要的朋友可以參考下
    2023-11-11

最新評論