亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

typescript?實現(xiàn)RabbitMQ死信隊列和延遲隊列(訂單10分鐘未付歸還庫存)的過程

 更新時間:2024年03月29日 10:35:10   作者:熊明才  
RabbitMQ作為一款流行的消息隊列服務(wù),提供了死信隊列(Dead?Letter?Exchange)功能,能夠有效地處理消息被拒絕、消息過期以及隊列達到最大長度等情況,本文將介紹如何利用RabbitMQ的死信隊列來處理這三種情況,并提供了TypeScript示例代碼,需要的朋友可以參考下

Manjaro安裝RabbitMQ

安裝

sudo pacman -S rabbitmq rabbitmqadmin

啟動管理模塊

sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默認用戶名和密碼都是guest。
要使用 rabbitmqctl 命令添加用戶并分配權(quán)限,您可以按照以下步驟進行操作:

1.添加用戶

rabbitmqctl add_user mingcai password

請將 password 替換為您想要設(shè)置的實際密碼。

2.分配權(quán)限

rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

這個命令將用戶 mingcai 授予對所有虛擬主機的所有資源的讀、寫和管理權(quán)限。如果您只想給予特定權(quán)限,請適當(dāng)調(diào)整正則表達式 ".*",以授予適當(dāng)?shù)臋?quán)限。例如,如果您只想給予讀取權(quán)限,可以使用 "^amq\."。

3.可選步驟:設(shè)置用戶角色

您可以將用戶分配給不同的角色,以便更好地管理權(quán)限。例如,您可以將用戶添加到 administrator 角色以獲取管理員權(quán)限:

rabbitmqctl set_user_tags mingcai administrator

這樣,用戶 mingcai 就被賦予了管理員權(quán)限。

請確保您具有適當(dāng)?shù)臋?quán)限來執(zhí)行這些操作,并確保替換示例中的用戶名和密碼為您自己的實際值。

死信隊列

標(biāo)題:利用RabbitMQ死信隊列處理消息的三種情況

在消息隊列的應(yīng)用中,處理異常情況和消息的延遲成為了一項重要的任務(wù)。RabbitMQ作為一款流行的消息隊列服務(wù),提供了死信隊列(Dead Letter Exchange)功能,能夠有效地處理消息被拒絕、消息過期以及隊列達到最大長度等情況。本文將介紹如何利用RabbitMQ的死信隊列來處理這三種情況,并提供了TypeScript示例代碼。

1. 消息被拒絕

當(dāng)消費者無法處理某條消息時,可以選擇將其標(biāo)記為“被拒絕”。這種情況下,我們可以配置RabbitMQ,將被拒絕的消息發(fā)送到一個死信隊列,以后再處理。

// 引入amqplib庫
import * as amqp from 'amqplib';
// 連接到RabbitMQ服務(wù)器
const connection = await amqp.connect('amqp://localhost');
// 創(chuàng)建Channel
const channel = await connection.createChannel();
// 定義隊列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {
  // 設(shè)置死信交換機
  deadLetterExchange: 'my_dead_letter_exchange'
});
// 消費消息
channel.consume(queueName, (msg) => {
  // 處理消息
  if (msg) {
    // 處理失敗,拒絕消息并將其重新放回隊列
    // channel.reject(msg, true); // 第二個參數(shù)設(shè)為 true 表示將消息重新放回隊列
    // 處理失敗,拒絕消息
    channel.reject(msg, false); // 第二個參數(shù)設(shè)為 false 表示將消息投遞到死信隊列
     // or 處理失敗,拒絕消息并將其重新放回死信隊列
    channel.nack(msg, false, false); // 第二個參數(shù)設(shè)為 false 表示不將消息重新放回原隊列,第三個參數(shù)設(shè)為 false 表示不拒絕當(dāng)前和之前所有未確認的消息
  }
});

2. 消息過期

有時候我們希望消息在一定時間內(nèi)被處理,如果超過了這個時間,就認為消息已經(jīng)過期。RabbitMQ允許我們設(shè)置消息的過期時間,并在消息過期后將其發(fā)送到死信隊列。

// 發(fā)布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {
  expiration: '60000' // 設(shè)置過期時間為60秒
});

3. 隊列達到最大長度

為了避免隊列過載,我們可以限制隊列的最大長度。當(dāng)隊列達到最大長度時,新的消息將被拒絕,并發(fā)送到死信隊列。

// 定義隊列
await channel.assertQueue(queueName, {
  maxLength: 100, // 設(shè)置最大隊列長度為100
  deadLetterExchange: 'my_dead_letter_exchange'
});

通過以上配置,我們可以利用RabbitMQ的死信隊列來處理消息被拒絕、消息過期以及隊列達到最大長度等情況,保證消息系統(tǒng)的穩(wěn)定性和可靠性。

以上是利用TypeScript示例代碼演示了如何在RabbitMQ中使用死信隊列。希望這篇文章對你有所幫助!

延時隊列

什么是延時隊列?顧名思義:首先它要具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。
延時隊列在項目中的應(yīng)用還是比較多的,尤其像電商類平臺:
1、訂單成功后,在30分鐘內(nèi)沒有支付,自動取消訂單
2、外賣平臺發(fā)送訂餐通知,下單成功后60s給用戶推送短信。
3、如果訂單一直處于某一個未完結(jié)狀態(tài)時,及時處理關(guān)單,并退還庫存
4、淘寶新建商戶一個月內(nèi)還沒上傳商品信息,將凍結(jié)商鋪等

npm install amqplib --save
npm install @types/amqplib --save-dev

總結(jié)

rabbitmqadmin 使用入門

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于執(zhí)行各種管理任務(wù),如創(chuàng)建隊列、交換機,查看隊列狀態(tài)等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=password
rabbitmqadmin list exchanges

查看 RabbitMQ 服務(wù)器信息

rabbitmqadmin status

列出所有交換機

rabbitmqadmin list exchanges

列出所有隊列

rabbitmqadmin list queues

創(chuàng)建一個交換機

rabbitmqadmin declare exchange name=my_exchange type=direct

創(chuàng)建一個隊列

rabbitmqadmin declare queue name=my_queue

綁定隊列到交換機

rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key

發(fā)送消息到指定交換機

rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"

獲取隊列消息

rabbitmqadmin get queue=my_queue

這些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和選項。你可以通過運行 rabbitmqadmin help 命令來獲取更詳細的幫助信息,或者查看官方文檔以了解更多選項和使用方法。

延時3秒和8秒全部代碼

// delayProducer.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    const exchange = 'routing_exchange';
    // 定義 dlx-exchange
    const dlxExchangeName = 'dlx-exchange';
    // 聲明交換機
    await channel.assertExchange(exchange, 'direct', {durable: true});
    await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丟失
    const dlxqueueBindings= [
        {
            dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',
        },
        {
            dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'
        }
    ];
    for (const binding of dlxqueueBindings) {
        // 綁定延遲死信隊列
        await channel.assertQueue(binding.dlxQueueName );
        //死信交換機和死信隊列綁定 Routing key fast 的消息
        await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 將 dlx-queue 綁定到死信交換機
    }
    // 定義隊列和路由鍵的映射
    const queueBindings = [
        {
            queue: '3_second_queue', routingKey: 'fast', arguments: {
                'x-message-ttl': 3000, // TTL 設(shè)置為 3 秒 消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱
            }
        },
        {
            queue: '8_second_queue', routingKey: 'slow', arguments: {
                'x-message-ttl': 8000, // TTL 設(shè)置為 8 秒 消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱
            }
        }
    ];
    // 聲明隊列,并將隊列綁定到交換機上
    for (const binding of queueBindings) {
        await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});
        await channel.bindQueue(binding.queue, exchange, binding.routingKey);
    }
    for (let i = 0; i < 10; i++) {
        await new Promise((resolve) => {
            setTimeout(() => {
                resolve(1)
            }, 1000)
        })
        const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
        console.log('當(dāng)前中國時間:', chinaTime);
        // 發(fā)送消息到交換機,并設(shè)置不同的路由鍵
        await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);
        await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);
    }
    // 關(guān)閉連接
    setTimeout(async () => {
        await channel.close();
        await connection.close();
    }, 10000); // 在 10 秒后關(guān)閉連接
}
async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {
    channel.publish(exchange, routingKey, Buffer.from(message));
    console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}
setupRouting().catch(console.error);
//消費者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    let queue = 'dlx-3_second_queue'
    // 定義隊列和路由鍵的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 確認消息已被處理
        }
    });
}
setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    let queue = 'dlx-8_second_queue'
    // 定義隊列和路由鍵的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 確認消息已被處理
        }
    });
}
setupRouting().catch(console.error);

到此這篇關(guān)于typescript 實現(xiàn)RabbitMQ死信隊列和延遲隊列(訂單10分鐘未付歸還庫存)的文章就介紹到這了,更多相關(guān)RabbitMQ死信隊列和延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 軟件測試面試如何測試一個杯子

    軟件測試面試如何測試一個杯子

    本文主要介紹軟件測試面試如何測試一個杯子,這里幫大家整理了詳細的面試資料,和面試需要準(zhǔn)備的知識點,有興趣的小伙伴可以參考下
    2016-08-08
  • 2020史上最全IDEA插件總結(jié)(推薦收藏)

    2020史上最全IDEA插件總結(jié)(推薦收藏)

    這篇文章主要介紹了2020史上最全IDEA插件總結(jié),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2020-06-06
  • Keil?uVision5?5.38官方下載、安裝及注冊超詳細圖文教程

    Keil?uVision5?5.38官方下載、安裝及注冊超詳細圖文教程

    這篇文章主要介紹了Keil?uVision5?5.38官方下載、安裝及注冊教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-03-03
  • JetBrains發(fā)布java代碼質(zhì)量檢測工具Qodana早期預(yù)覽版

    JetBrains發(fā)布java代碼質(zhì)量檢測工具Qodana早期預(yù)覽版

    這篇文章主要介紹了JetBrains發(fā)布java代碼質(zhì)量檢測工具Qodana早期預(yù)覽版,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-01-01
  • XMind?2021激活碼及安裝步驟

    XMind?2021激活碼及安裝步驟

    XMind?是一款非常實用的商業(yè)思維導(dǎo)圖,應(yīng)用Eclipse?RCP?開發(fā)架構(gòu),打造易用、高效的可視化思維,強調(diào)該功能的可擴展、跨平臺、穩(wěn)定性和性能,致力于幫助用戶提高生產(chǎn)率。本文給大家?guī)砹薠Mind?2021激活碼,需要的朋友可以參考下
    2021-12-12
  • Git配置用戶簽名方式及原因說明

    Git配置用戶簽名方式及原因說明

    這篇文章主要為大家介紹了Git配置用戶簽名方式及原因說明,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-04-04
  • 互聯(lián)網(wǎng)科技大佬推薦的12本必讀書籍

    互聯(lián)網(wǎng)科技大佬推薦的12本必讀書籍

    12本互聯(lián)網(wǎng)科技大佬推薦的必讀書籍,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • 谷歌師兄的算法刷題筆記

    谷歌師兄的算法刷題筆記

    這篇文章主要介紹了谷歌師兄的算法刷題筆記的相關(guān)資料,需要的朋友可以參考下
    2021-02-02
  • Git配置用戶簽名方式的拓展示例實現(xiàn)全面講解

    Git配置用戶簽名方式的拓展示例實現(xiàn)全面講解

    這篇文章主要為大家介紹了Git配置用戶簽名方式的拓展示例實現(xiàn)全面講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-04-04
  • 本地部署DeepSeek開源多模態(tài)大模型Janus-Pro-7B實操教程

    本地部署DeepSeek開源多模態(tài)大模型Janus-Pro-7B實操教程

    文章介紹了Janus-Pro-7B,一個由DeepSeek開發(fā)的開源多模態(tài)AI模型,它在文本和圖像處理方面表現(xiàn)出色,并且具有強大的性能和靈活性,詳細介紹了如何在本地環(huán)境中部署Janus-Pro-7B,并展示了其在圖像理解和生成、文本生成、多模態(tài)推理等任務(wù)中的應(yīng)用效果,感興趣的朋友一起看看吧
    2025-02-02

最新評論