RabbitMQ消息隊列之持久化機制詳解
一、RabbitMQ 持久化機制
1、RabbitMQ 持久化概述
持久化,即將原本存在于內(nèi)存中的數(shù)據(jù)寫入到磁盤上永久保存數(shù)據(jù),防止服務(wù)宕機時內(nèi)存數(shù)據(jù)的丟失。
Rabbitmq 的持久化分為隊列持久化、消息持久化和交換器持久化。
對于消息來說,不管是持久化的消息還是非持久化的消息都可以被寫入到磁盤。
持久化的消息會同時寫入磁盤和內(nèi)存(加快讀取速度),非持久化消息會在內(nèi)存不夠用時,將消息寫入磁盤(一般重啟之后就沒有了)。
2、隊列持久化
隊列的持久化是在定義隊列時的通過 durable 參數(shù)來決定的,當 durable 為 true 時,才代表隊列會持久化。例如:
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二個餐胡設(shè)置為true,代表隊列持久化
channel.queueDeclare("queue.persistent.name", true, false, false, null);關(guān)鍵的是第二個參數(shù)設(shè)置為 true,即 durable = true。Channel 類中 queueDeclare 的完整定義如下:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
參數(shù)說明:
- queue:queue 的名稱
- exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,并在連接斷開時自動刪除。這里需要注意三點:
- 排他隊列是基于連接可見的,同一連接的不同信道是可以同時訪問同一連接創(chuàng)建的排他隊列;
- “首次”,如果一個連接已經(jīng)聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
- 即使該隊列是持久化的,一旦連接關(guān)閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用于一個客戶端發(fā)送讀取消息的應(yīng)用場景。
- autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用于臨時隊列。
總結(jié):如果將 queue 的持久化標識 durable 設(shè)置為 true,則代表是一個持久的隊列。當服務(wù)重啟之后,隊列仍然會存在,這是因為服務(wù)會把持久化的 queue 存放在硬盤上,當服務(wù)重啟的時候,會重新加載這些被持久化的 queue。
3、消息持久化
隊列是可以被持久化,但是里面的消息是否為持久化那還要看消息的持久化設(shè)置。
也就是說,重啟之前 queue 里面如果還有未發(fā)出去的消息的話,重啟之后,消息是否還存在隊列里面就要取決于在發(fā)送消息時對消息的設(shè)置。
消息持久化的實現(xiàn)需要在發(fā)送消息時設(shè)置消息的持久化標識,例如:
channel.basicPublish("exchange01", "routing_key01", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_message".getBytes());方法原型是:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
這里關(guān)鍵的是 BasicProperties props 這個參數(shù),它的定義如下:
public BasicProperties(
String contentType,//消息類型如:text/plain
String contentEncoding,//編碼
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//優(yōu)先級
String correlationId,
String replyTo,//反饋隊列
String expiration,//expiration到期時間
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId
)其中 deliveryMode=1 代表不持久化, deliveryMode=2 代表持久化。而代碼實現(xiàn)中的 MessageProperties.PERSISTENT_PLAIN 值是官方提供的一個將 deliveryMode 設(shè)置為 2 的 BasicProperties 的對象:
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties(
"text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null
);除此之外,我們也可以使用另一種方式來設(shè)置消息持久化標志位:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); // 將 deliveryMode 值設(shè)置為 2 表示消息持久化
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange01", "routing_key01", properties, "persistent_message".getBytes());至此,我們可以知道:當 broker 服務(wù)其重啟后,想要消息不丟失,既需要設(shè)置隊列持久化,也需要設(shè)置消息持久化。
擴展知識:
basicPublish 方法還有另外兩個重載方法:
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;這里有兩個關(guān)鍵的參數(shù): mandatory 和 immediate 。這兩個標識位都有當消息傳遞過程中不可達目的地時將消息返回給生產(chǎn)者的功能。下面簡單講解下這兩個標識位:
1)mandatory
- 當 mandatory 標志位設(shè)置為 true 時:如果 exchange 根據(jù)自身類型和消息 routeKey 無法找到一個符合條件的 queue,那么會調(diào)用
basic.return方法將消息返回給生產(chǎn)者(Basic.Return + Content-Header + Content-Body); - 當 mandatory 設(shè)置為 false 時,如果出現(xiàn)上述情形的話,broker 會直接將消息扔掉。
2)immediate
- 當 immediate 標志位設(shè)置為 true 時:如果 exchange 在將消息路由到 queue(s) 時發(fā)現(xiàn)對于的 queue 上么有消費者,那么這條消息不會放入隊列中。當與消息 routeKey 關(guān)聯(lián)的所有 queue(一個或者多個)都沒有消費者時,該消息會通過
basic.return方法返還給生產(chǎn)者。
概括來說就是:
- mandatory 標志告訴服務(wù)器至少將該消息 route 到一個隊列中,否則將消息返還給生產(chǎn)者;
- immediate 標志告訴服務(wù)器如果該消息關(guān)聯(lián)的 queue上有消費者,則馬上將消息投遞給它,如果所有 queue 都沒有消費者,直接把消息返還給生產(chǎn)者,不用將消息入隊列等待消費者了。
4、交換器持久化
對于消息的可靠性來說,只需要設(shè)置隊列的持久化和消息的持久化即可。exchange 的持久化并沒有什么影響,但是,如果 exchange 不設(shè)置持久化的話,當 broker 服務(wù)重啟之后,exchange 將不復(fù)存在,這樣會導致消息發(fā)送者 producer 無法正常發(fā)送消息。
所以,建議同樣設(shè)置 exchange 的持久化。而 exchange 的持久化設(shè)置也特別簡單,設(shè)置方法原型如下:
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable)throws IOException;
- exchange:交換器的名稱;
- type:交換器的類型,常見的如
fanout direct topic; - durable:持久話標志位, durable 設(shè)置為
true表示持久化, 反之為非持久 。
所以,只需要在聲明的時候?qū)?durable 字段設(shè)置為 true 即可:
channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
二、RabbitMQ 知識擴展
1、內(nèi)存告警與內(nèi)存換頁
1-1、內(nèi)存告警
當內(nèi)存使用超過配置的閾值時,RabbitMQ 會暫時阻塞客戶端的連接,并停止接收從客戶端發(fā)來的消息,以避免服務(wù)崩潰,客戶端與服務(wù)端的心跳檢測也會失敗。
當出現(xiàn)內(nèi)存告警時,可以通過管理命令臨時調(diào)整內(nèi)存大小:
RabbitMQctl set_vm_memory_high_watermark <fraction>
fraction為內(nèi)存閾值,RabbitMQ 默認是 0.4,表示當 RabbitMQ 使用的內(nèi)存超過總內(nèi)存的 40% 時,就會產(chǎn)生告警并阻塞所有生產(chǎn)則連接。
通過此命令修改的閾值在 RabbitMQ 重啟之后將會失效,如果想要設(shè)置的閾值永久有效需要修改配置文件:
# 相對值,也就是前面的fraction,建議設(shè)置在0.4~0.66之間,不要超過0.7 vm_memory_high_watermark.relative=0.4 # 絕對值,單位為KB,MB,GB,對應(yīng)的臨時命令是:RabbitMQctl set_vm_memory_high_watermark absolute <value> #vm_memory_high_watermark.absolute=1GB
修改完配置文件后,需要重啟服務(wù)才會生效。
1-2、內(nèi)存換頁
在某個 broker 節(jié)點觸及內(nèi)存閾值并阻塞生產(chǎn)者之前,它會嘗試將隊列內(nèi)存中的消息換頁存儲到磁盤以釋放內(nèi)存空間。持久化和非持久化的消息都會被轉(zhuǎn)儲到磁盤中,其中持久化的消息本身就在磁盤中有一個備份,所以這里會將持久化的消息直接從內(nèi)存中清除掉。
默認情況下,在內(nèi)存使用達到設(shè)置的閾值的 50% 時會進行換頁操作。也就是說,在默認的內(nèi)存閾值 40% 的情況下,當內(nèi)存超過 40% * 50% = 20% 時會經(jīng)行換頁動作。
內(nèi)存換頁閾值可以通過在配置文件中設(shè)置來進行調(diào)整:
vm_memory_high_watermark_paging_ratio=0.75
上面的配置將會在 RabbitMQ 內(nèi)存使用率達到 30%(假設(shè)內(nèi)存閾值是 0.4)時進行換頁動作,并在 40% 時阻塞生產(chǎn)者(當 vm_memory_high_watermark_paging_ratio 的值大于 1 時,相當于禁用了換頁功能)。
2、磁盤告警與配置
2-1、磁盤告警
當磁盤剩余空間低于設(shè)置的閾值時,RabbitMQ 同樣會阻塞生產(chǎn)者,這樣可以避免因非持久化的消息持續(xù)換頁而耗盡磁盤空間導致服務(wù)崩潰。
默認情況下,磁盤的閾值是50M,表示當磁盤剩余空間低于50M時,會阻塞生產(chǎn)者并停止內(nèi)存中消息的換頁動作。
這個閾值的設(shè)置可以減小,但不能完全消除因磁盤耗盡而導致崩潰的可能性。比如在兩次磁盤空間檢測期間內(nèi),磁盤空間從大于50M被耗盡到0M。
2-2、修改磁盤告警閾值
可以通過以下命令臨時調(diào)整磁盤閾值:
#設(shè)置具體大小,單位為KB/MB/GB RabbitMQctl set_disk_free_limit <disk_limit> #設(shè)置相對值,建議取值為1.0~2.0(相對于內(nèi)存的倍數(shù),如內(nèi)存大小是8G,若為1.0,則表示磁盤剩余8G時,阻塞) RabbitMQctl set_disk_free_limit mem_relative <fraction>
如果要永久生效需要對應(yīng)的配置文件,配置如下(需要重啟生效):
disk_free_limit.relative=2.0 #disk_free_limit_absolute=50MB
這里有個建議:一個相對謹慎的做法是將磁盤閾值設(shè)置為與操作系統(tǒng)所顯示的內(nèi)存大小一致。
3、數(shù)據(jù)寫入磁盤時機
- 消息的正常寫入磁盤流程為:消息數(shù)據(jù)寫入到緩存 Buffer 中(大小為 1 M),Buffer 數(shù)據(jù)滿了之后會寫入內(nèi)存文件中,最后再刷新到磁盤文件中;
- 存在個固定的刷盤時間:25ms,也就是不管 Buffe r滿不滿,每隔 25ms,Buffer 里的數(shù)據(jù)及未刷新到磁盤的文件內(nèi)容必定會刷到磁盤;
- 每次消息寫入后,如果沒有后續(xù)寫入請求,則會直接將已寫入的消息刷到磁盤:使用 Erlang 的 receive x after 0 來實現(xiàn)。只要進程的信箱里沒有消息,則產(chǎn)生一個 timeout 消息,而 timeout 會觸發(fā)刷盤操作。
4、磁盤消息格式
消息保存于 $MNESIA/msg_store_persistent/x.rdq 文件中,其中 x 為數(shù)字編號,從 1 開始,每個文件最大為 16M(16777216),超過這個大小會生成新的文件,文件編號加 1。
文件中的消息格式如下:
<<Size:64, MsgId:16/binary, MsgBody>>
- MsgId 為 RabbitMQ 通過
rabbit_guid:gen()每一個消息生成的 GUID; - MsgBody 會包含消息對應(yīng)的 exchange,routing_keys,消息的內(nèi)容,消息對應(yīng)的協(xié)議版本,消息內(nèi)容格式(二進制還是其它)等等。
5、磁盤文件刪除機制
- 當所有磁盤文件中的垃圾消息(已經(jīng)被刪除的消息)比例大于閾值(GARBAGE_FRACTION = 0.5)時,會觸發(fā)文件合并操作(至少有三個文件存在的情況下),以提高磁盤利用率。
- publish 消息時寫入內(nèi)容,ack 消息時刪除內(nèi)容(更新該文件的有用數(shù)據(jù)大小),當一個文件的有用數(shù)據(jù)等于 0 時,刪除該文件。
到此這篇關(guān)于RabbitMQ消息隊列之持久化機制詳解的文章就介紹到這了,更多相關(guān)RabbitMQ持久化機制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot結(jié)合mysql主從來實現(xiàn)讀寫分離的方法示例
這篇文章主要介紹了springboot結(jié)合mysql主從來實現(xiàn)讀寫分離的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
SpringBoot配置MyBatis-Plus實現(xiàn)增刪查改
本文主要介紹了SpringBoot配置MyBatis-Plus實現(xiàn)增刪查改,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08
淺談SpringBoot項目打成war和jar的區(qū)別
這篇文章主要介紹了淺談SpringBoot項目打成war和jar的區(qū)別,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-11-11
Java實現(xiàn)猜數(shù)字小游戲(有次數(shù)限制)
這篇文章主要為大家詳細介紹了Java實現(xiàn)猜數(shù)字小游戲,有次數(shù)限制,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-05-05
spring boot使用logback實現(xiàn)多環(huán)境日志配置詳解
這篇文章主要介紹了spring boot使用logback實現(xiàn)多環(huán)境日志配置詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08

