typescript?實現(xiàn)RabbitMQ死信隊列和延遲隊列(訂單10分鐘未付歸還庫存)的過程
Manjaro安裝RabbitMQ
安裝
sudo pacman -S rabbitmq rabbitmqadmin
啟動管理模塊
sudo rabbitmq-plugins enable rabbitmq_management sudo rabbitmq-server
管理界面
http://127.0.0.1:15672/
默認用戶名和密碼都是guest。
要使用 rabbitmqctl
命令添加用戶并分配權(quán)限,您可以按照以下步驟進行操作:
1.添加用戶:
rabbitmqctl add_user mingcai password
請將 password
替換為您想要設(shè)置的實際密碼。
2.分配權(quán)限:
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"
這個命令將用戶 mingcai
授予對所有虛擬主機的所有資源的讀、寫和管理權(quán)限。如果您只想給予特定權(quán)限,請適當(dāng)調(diào)整正則表達式 ".*"
,以授予適當(dāng)?shù)臋?quán)限。例如,如果您只想給予讀取權(quán)限,可以使用 "^amq\."
。
3.可選步驟:設(shè)置用戶角色:
您可以將用戶分配給不同的角色,以便更好地管理權(quán)限。例如,您可以將用戶添加到 administrator
角色以獲取管理員權(quán)限:
rabbitmqctl set_user_tags mingcai administrator
這樣,用戶 mingcai
就被賦予了管理員權(quán)限。
請確保您具有適當(dāng)?shù)臋?quán)限來執(zhí)行這些操作,并確保替換示例中的用戶名和密碼為您自己的實際值。
死信隊列
標(biāo)題:利用RabbitMQ死信隊列處理消息的三種情況
在消息隊列的應(yīng)用中,處理異常情況和消息的延遲成為了一項重要的任務(wù)。RabbitMQ作為一款流行的消息隊列服務(wù),提供了死信隊列(Dead Letter Exchange)功能,能夠有效地處理消息被拒絕、消息過期以及隊列達到最大長度等情況。本文將介紹如何利用RabbitMQ的死信隊列來處理這三種情況,并提供了TypeScript示例代碼。
1. 消息被拒絕
當(dāng)消費者無法處理某條消息時,可以選擇將其標(biāo)記為“被拒絕”。這種情況下,我們可以配置RabbitMQ,將被拒絕的消息發(fā)送到一個死信隊列,以后再處理。
// 引入amqplib庫 import * as amqp from 'amqplib'; // 連接到RabbitMQ服務(wù)器 const connection = await amqp.connect('amqp://localhost'); // 創(chuàng)建Channel const channel = await connection.createChannel(); // 定義隊列 const queueName = 'my_queue'; await channel.assertQueue(queueName, { // 設(shè)置死信交換機 deadLetterExchange: 'my_dead_letter_exchange' }); // 消費消息 channel.consume(queueName, (msg) => { // 處理消息 if (msg) { // 處理失敗,拒絕消息并將其重新放回隊列 // channel.reject(msg, true); // 第二個參數(shù)設(shè)為 true 表示將消息重新放回隊列 // 處理失敗,拒絕消息 channel.reject(msg, false); // 第二個參數(shù)設(shè)為 false 表示將消息投遞到死信隊列 // or 處理失敗,拒絕消息并將其重新放回死信隊列 channel.nack(msg, false, false); // 第二個參數(shù)設(shè)為 false 表示不將消息重新放回原隊列,第三個參數(shù)設(shè)為 false 表示不拒絕當(dāng)前和之前所有未確認的消息 } });
2. 消息過期
有時候我們希望消息在一定時間內(nèi)被處理,如果超過了這個時間,就認為消息已經(jīng)過期。RabbitMQ允許我們設(shè)置消息的過期時間,并在消息過期后將其發(fā)送到死信隊列。
// 發(fā)布消息 await channel.sendToQueue(queueName, Buffer.from('Hello'), { expiration: '60000' // 設(shè)置過期時間為60秒 });
3. 隊列達到最大長度
為了避免隊列過載,我們可以限制隊列的最大長度。當(dāng)隊列達到最大長度時,新的消息將被拒絕,并發(fā)送到死信隊列。
// 定義隊列 await channel.assertQueue(queueName, { maxLength: 100, // 設(shè)置最大隊列長度為100 deadLetterExchange: 'my_dead_letter_exchange' });
通過以上配置,我們可以利用RabbitMQ的死信隊列來處理消息被拒絕、消息過期以及隊列達到最大長度等情況,保證消息系統(tǒng)的穩(wěn)定性和可靠性。
以上是利用TypeScript示例代碼演示了如何在RabbitMQ中使用死信隊列。希望這篇文章對你有所幫助!
延時隊列
什么是延時隊列?顧名思義:首先它要具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。
延時隊列在項目中的應(yīng)用還是比較多的,尤其像電商類平臺:
1、訂單成功后,在30分鐘內(nèi)沒有支付,自動取消訂單
2、外賣平臺發(fā)送訂餐通知,下單成功后60s給用戶推送短信。
3、如果訂單一直處于某一個未完結(jié)狀態(tài)時,及時處理關(guān)單,并退還庫存
4、淘寶新建商戶一個月內(nèi)還沒上傳商品信息,將凍結(jié)商鋪等
npm install amqplib --save npm install @types/amqplib --save-dev
總結(jié)
rabbitmqadmin 使用入門
rabbitmqadmin
是 RabbitMQ 的命令行管理工具,可以用于執(zhí)行各種管理任務(wù),如創(chuàng)建隊列、交換機,查看隊列狀態(tài)等。以下是一些基本的用法示例:
export RABBITMQ_SERVER=127.0.0.1 export RABBITMQ_PORT=5672 export RABBITMQ_USER=mingcai export RABBITMQ_PASSWORD=password rabbitmqadmin list exchanges
查看 RabbitMQ 服務(wù)器信息:
rabbitmqadmin status
列出所有交換機:
rabbitmqadmin list exchanges
列出所有隊列:
rabbitmqadmin list queues
創(chuàng)建一個交換機:
rabbitmqadmin declare exchange name=my_exchange type=direct
創(chuàng)建一個隊列:
rabbitmqadmin declare queue name=my_queue
綁定隊列到交換機:
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
發(fā)送消息到指定交換機:
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
獲取隊列消息:
rabbitmqadmin get queue=my_queue
這些命令只是一些基本用法示例,rabbitmqadmin
工具支持更多功能和選項。你可以通過運行 rabbitmqadmin help
命令來獲取更詳細的幫助信息,或者查看官方文檔以了解更多選項和使用方法。
延時3秒和8秒全部代碼
// delayProducer.ts import * as amqp from 'amqplib'; async function setupRouting() { const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1'); const channel = await connection.createChannel(); const exchange = 'routing_exchange'; // 定義 dlx-exchange const dlxExchangeName = 'dlx-exchange'; // 聲明交換機 await channel.assertExchange(exchange, 'direct', {durable: true}); await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丟失 const dlxqueueBindings= [ { dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast', }, { dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow' } ]; for (const binding of dlxqueueBindings) { // 綁定延遲死信隊列 await channel.assertQueue(binding.dlxQueueName ); //死信交換機和死信隊列綁定 Routing key fast 的消息 await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 將 dlx-queue 綁定到死信交換機 } // 定義隊列和路由鍵的映射 const queueBindings = [ { queue: '3_second_queue', routingKey: 'fast', arguments: { 'x-message-ttl': 3000, // TTL 設(shè)置為 3 秒 消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱。 'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱 } }, { queue: '8_second_queue', routingKey: 'slow', arguments: { 'x-message-ttl': 8000, // TTL 設(shè)置為 8 秒 消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱。 'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過期時將重新發(fā)布到的交換器的可選名稱 } } ]; // 聲明隊列,并將隊列綁定到交換機上 for (const binding of queueBindings) { await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments}); await channel.bindQueue(binding.queue, exchange, binding.routingKey); } for (let i = 0; i < 10; i++) { await new Promise((resolve) => { setTimeout(() => { resolve(1) }, 1000) }) const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }); console.log('當(dāng)前中國時間:', chinaTime); // 發(fā)送消息到交換機,并設(shè)置不同的路由鍵 await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`); await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`); } // 關(guān)閉連接 setTimeout(async () => { await channel.close(); await connection.close(); }, 10000); // 在 10 秒后關(guān)閉連接 } async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) { channel.publish(exchange, routingKey, Buffer.from(message)); console.log(`Sent message '${message}' with routing key '${routingKey}'`); } setupRouting().catch(console.error);
//消費者 dlx-3_second_queue.ts import * as amqp from 'amqplib'; async function setupRouting() { const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1'); const channel = await connection.createChannel(); let queue = 'dlx-3_second_queue' // 定義隊列和路由鍵的映射 await channel.consume(queue, (msg) => { if (msg !== null) { const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }); console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`); channel.ack(msg); // 確認消息已被處理 } }); } setupRouting().catch(console.error); //dlx-8_second_queue.ts import * as amqp from 'amqplib'; async function setupRouting() { const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1'); const channel = await connection.createChannel(); let queue = 'dlx-8_second_queue' // 定義隊列和路由鍵的映射 await channel.consume(queue, (msg) => { if (msg !== null) { const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }); console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`); channel.ack(msg); // 確認消息已被處理 } }); } setupRouting().catch(console.error);
到此這篇關(guān)于typescript 實現(xiàn)RabbitMQ死信隊列和延遲隊列(訂單10分鐘未付歸還庫存)的文章就介紹到這了,更多相關(guān)RabbitMQ死信隊列和延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Keil?uVision5?5.38官方下載、安裝及注冊超詳細圖文教程
這篇文章主要介紹了Keil?uVision5?5.38官方下載、安裝及注冊教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-03-03JetBrains發(fā)布java代碼質(zhì)量檢測工具Qodana早期預(yù)覽版
這篇文章主要介紹了JetBrains發(fā)布java代碼質(zhì)量檢測工具Qodana早期預(yù)覽版,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01本地部署DeepSeek開源多模態(tài)大模型Janus-Pro-7B實操教程
文章介紹了Janus-Pro-7B,一個由DeepSeek開發(fā)的開源多模態(tài)AI模型,它在文本和圖像處理方面表現(xiàn)出色,并且具有強大的性能和靈活性,詳細介紹了如何在本地環(huán)境中部署Janus-Pro-7B,并展示了其在圖像理解和生成、文本生成、多模態(tài)推理等任務(wù)中的應(yīng)用效果,感興趣的朋友一起看看吧2025-02-02