springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
1. 死信隊(duì)列
死信隊(duì)列是一種特殊的消息隊(duì)列,用來存儲無法被正常消費(fèi)的消息,常被用來實(shí)現(xiàn)延遲處理,異常消息處理等,提高了系統(tǒng)的可伸縮性和容錯性,能夠應(yīng)對高并發(fā)和異常消息。
死信隊(duì)列中的消息被稱為死信消息,用來分發(fā)死信消息的交換機(jī)被稱為死信交換機(jī)(Dead Letter Exchange,DLX)。
死信隊(duì)列在實(shí)際項(xiàng)目中的應(yīng)用場景有很多如:
- 訂單超時未支付,將此消息放入死信隊(duì)列中,等待后續(xù)處理(延遲等待)
- 消息消費(fèi)失敗將消息放入死信隊(duì)列中進(jìn)行重試(消息重試機(jī)制)

2.正常消息成為死信消息的條件
- 消息到了過期時間仍然未被消費(fèi)者消費(fèi)
- 隊(duì)列已滿無法保存新消息
- 消息被拒絕消費(fèi)且未設(shè)置重新放入隊(duì)列

3.消費(fèi)者1
package com.hong.rabbitmq9;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: 死信隊(duì)列消費(fèi)者1
* @Author: hong
* @Date: 2024-01-17 21:04
* @Version: 1.0
**/
public class Consumer1 {
//正常交換機(jī)名稱
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機(jī)名稱
public static final String DEAD_EXCHANGE = "dead_exchange";
//正常隊(duì)列名稱
public static final String NORMAL_QUEUE = "normal_queue";
//死信隊(duì)列名稱
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
//正常隊(duì)列綁定死信隊(duì)列信息
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", "lisi");
//聲明正常隊(duì)列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
System.out.println("Consumer1等待接收消息:");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
System.out.println( "routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = var -> {
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}



4.生產(chǎn)者
package com.hong.rabbitmq9;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
/**
* @Description: 死信隊(duì)列消息生產(chǎn)者
* @Author: hong
* @Date: 2024-01-17 20:49
* @Version: 1.0
**/
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//設(shè)置消息的 TTL 時間 10s
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();
//該信息是用作演示隊(duì)列個數(shù)限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生產(chǎn)者發(fā)送消息:" + message);
}
}
}
啟動消費(fèi)者1后馬上關(guān)閉消費(fèi)者1,模擬消費(fèi)者1接收不到消息,再啟動生產(chǎn)者


生產(chǎn)者發(fā)送10條正常隊(duì)列中有10條消息

10s后正常隊(duì)列中的消息由于沒有消費(fèi)者消費(fèi)進(jìn)入死信隊(duì)列中

5.消費(fèi)者2
package com.hong.rabbitmq9;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: 死信隊(duì)列-死信消費(fèi)者
* @Author: hong
* @Date: 2024-01-17 21:31
* @Version: 1.0
**/
public class Consumer2 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("Consumer2等待接收死信消息:");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
System.out.println( "routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = var -> {
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
啟動消費(fèi)者2,使其消費(fèi)死信隊(duì)列中的消息

6.隊(duì)列達(dá)到最大長度
以上代碼是TTL,隊(duì)列達(dá)到最大長度只要將上述代碼稍微改動一下即可
6.1.注釋掉生產(chǎn)者代碼中的ttl部分
package com.hong.rabbitmq9;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
/**
* @Description: 死信隊(duì)列消息生產(chǎn)者
* @Author: hong
* @Date: 2024-01-17 20:49
* @Version: 1.0
**/
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//設(shè)置消息的 TTL 時間 10s
// AMQP.BasicProperties properties = new AMQP.BasicProperties()
// .builder().expiration("10000").build();
//該信息是用作演示隊(duì)列個數(shù)限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
System.out.println("生產(chǎn)者發(fā)送消息:" + message);
}
}
}
6.2.消費(fèi)者1代碼中加最大長度
package com.hong.rabbitmq9;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: 死信隊(duì)列消費(fèi)者1
* @Author: hong
* @Date: 2024-01-17 21:04
* @Version: 1.0
**/
public class Consumer1 {
//正常交換機(jī)名稱
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機(jī)名稱
public static final String DEAD_EXCHANGE = "dead_exchange";
//正常隊(duì)列名稱
public static final String NORMAL_QUEUE = "normal_queue";
//死信隊(duì)列名稱
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
//正常隊(duì)列綁定死信隊(duì)列信息
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", "lisi");
map.put("x-max-length",8);
//聲明正常隊(duì)列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
System.out.println("Consumer1等待接收消息:");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
System.out.println( "routingKey:" + message.getEnvelope().getRoutingKey() + ",消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = var -> {
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
啟動消費(fèi)者1后立馬關(guān)閉,模擬隊(duì)列已滿



到此這篇關(guān)于springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)springboot RabbitMQ死信隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- 關(guān)于Rabbitmq死信隊(duì)列及延時隊(duì)列的實(shí)現(xiàn)
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- RabbitMQ之死信隊(duì)列深入解析
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
Spring Cloud基于zuul實(shí)現(xiàn)網(wǎng)關(guān)過程解析
這篇文章主要介紹了Spring Cloud基于zuul實(shí)現(xiàn)網(wǎng)關(guān)過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12
SpringBoot 使用 @Value 注解讀取配置文件給靜態(tài)變量賦值
這篇文章主要介紹了SpringBoot 使用 @Value 注解讀取配置文件給靜態(tài)變量賦值,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
如何對Mysql數(shù)據(jù)表查詢出來的結(jié)果進(jìn)行排序
這篇文章主要介紹了如何對Mysql數(shù)據(jù)表查詢出來的結(jié)果進(jìn)行排序問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08
REST架構(gòu)及RESTful應(yīng)用程序簡介
這篇文章主要為大家介紹了REST架構(gòu)及RESTful的應(yīng)用程序簡介,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03
快速解決List集合add元素,添加多個對象出現(xiàn)重復(fù)的問題
這篇文章主要介紹了快速解決List集合add元素,添加多個對象出現(xiàn)重復(fù)的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08

