RabbitMQ中的死信隊列(Dead Letter Exchanges)詳解
RabbitMQ死信隊列
1. 介紹
當消息在一個隊列中變?yōu)樗佬藕?,它被重新發(fā)送到另一個Exchange。
2. 在什么情況下會出現(xiàn)死信
- 消息未被簽收,在消費端使用了 basic.reject 或 basic.nack ,并且requeue設(shè)置為false
- 消息過期(TTL)
- 消息隊列達到了最大長度
3. 實際應(yīng)用
當RabbitMQ出現(xiàn)死信,可能會導(dǎo)致業(yè)務(wù)邏輯錯誤,比如下訂單后修改庫存操作,在下單后因為某種原因,發(fā)送的消息未被簽收,這時庫存數(shù)據(jù)會出現(xiàn)不一致。
有死信隊列之后我們就可以監(jiān)聽死信隊列,來處理業(yè)務(wù)邏輯。
3.1 死信隊列設(shè)置
聲明隊列,添加參數(shù)x-dead-letter-exchange
Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //這個agruments屬性,要設(shè)置到聲明隊列上 channel.queueDeclare(queueName, true, false, false, agruments);
死信隊列,是一個普通的Exchange和queue,需要設(shè)置死信Exchange和queue,并進行綁定
/要進行死信隊列的聲明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); //可以匹配任意routeKey channel.queueBind("dlx.queue", "dlx.exchange", "#");
4 代碼實現(xiàn)
生產(chǎn)端
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.test"; String msg = "RabbitMQ DLX Message test"; for(int i =0; i<1; i ++){ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") //過期時間為1秒 .build(); channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } }
消費端
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 這就是一個普通的交換機 和 隊列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //這個agruments屬性,要設(shè)置到聲明隊列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要進行死信隊列的聲明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); //可以匹配任意routeKey channel.queueBind("dlx.queue", "dlx.exchange", "#"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息::"+new String(body)); } }); }
運行以上代碼,在交換機(Exchange)中會多出一個名為dlx.exchange 類型為topic的交換機,隊列中也有一個dlx.queue 隊列。
到此這篇關(guān)于RabbitMQ中的死信隊列(Dead Letter Exchanges)詳解的文章就介紹到這了,更多相關(guān)RabbitMQ死信隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中抽象類和接口的區(qū)別_動力節(jié)點Java學(xué)院整理
java抽象類和接口最本質(zhì)的區(qū)別是接口里不能實現(xiàn)方法--接口中的方法全是抽象方法。抽象類中可實現(xiàn)方法--抽象類中的方法可以不是抽象方法,下文給大家簡單介紹下,需要的的朋友參考下2017-04-04Java畢業(yè)設(shè)計實戰(zhàn)之二手書商城系統(tǒng)的實現(xiàn)
這是一個使用了java+JSP+Springboot+maven+mysql+ThymeLeaf+FTP開發(fā)的二手書商城系統(tǒng),是一個畢業(yè)設(shè)計的實戰(zhàn)練習,具有在線書城該有的所有功能,感興趣的朋友快來看看吧2022-01-01springboot整合ehcache和redis實現(xiàn)多級緩存實戰(zhàn)案例
這篇文章主要介紹了springboot整合ehcache和redis實現(xiàn)多級緩存實戰(zhàn)案例,從源碼角度分析下多級緩存實現(xiàn)原理,本文通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下2023-08-08mybatis動態(tài)生成sql語句的實現(xiàn)示例
在MyBatis中,動態(tài)SQL是一個非常重要的特性,它允許我們根據(jù)條件動態(tài)地生成SQL語句,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧2024-11-11