亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

關(guān)于利用RabbitMQ實(shí)現(xiàn)延遲任務(wù)的方法詳解

 更新時(shí)間:2017年12月11日 10:21:02   作者:nick hao  
最近在使用RabbitMQ來(lái)實(shí)現(xiàn)延遲任務(wù)的時(shí)候發(fā)現(xiàn),這其中的知識(shí)點(diǎn)還是挺多的,所以下面這篇文章主要給大家介紹了關(guān)于利用RabbitMQ實(shí)現(xiàn)延遲任務(wù)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下。

開(kāi)發(fā)過(guò)程中通常會(huì)碰到這樣的需求:

  • 淘寶訂單業(yè)務(wù):下單后 30min 之內(nèi)沒(méi)有付款,就自動(dòng)取消訂單。
  • 餓了嗎訂餐通知:下單成功后 60s 之后給用戶(hù)發(fā)送短信通知。
  • 關(guān)閉空閑連接:服務(wù)器中有很多客戶(hù)端的連接,空閑一段時(shí)間之后需要關(guān)閉之。
  • 緩存:緩存中的對(duì)象,超過(guò)了空閑時(shí)間,從緩存中移出。
  • 任務(wù)超時(shí)處理:在網(wǎng)絡(luò)協(xié)議滑動(dòng)窗口請(qǐng)求應(yīng)答式交互時(shí),處理超時(shí)未響應(yīng)的請(qǐng)求。
  • 失敗重試機(jī)制:業(yè)務(wù)操作失敗后,間隔一定的時(shí)間進(jìn)行失敗重試。

這類(lèi)業(yè)務(wù)的特點(diǎn)就是:需要延遲工作,需要進(jìn)行失敗重試。一種比較笨的方式是使用一個(gè)后臺(tái)線(xiàn)程,遍歷所有對(duì)象,挨個(gè)檢查。這種方法簡(jiǎn)單好用,但是對(duì)象數(shù)量過(guò)多時(shí),可能存在性能問(wèn)題,檢查間隔時(shí)間不好設(shè)置,間隔時(shí)間過(guò)大,影響精確度,過(guò)小則存在效率問(wèn)題,而且做不到按超時(shí)的時(shí)間順序處理。

再比如常見(jiàn)的場(chǎng)景:

場(chǎng)景一:物聯(lián)網(wǎng)系統(tǒng)經(jīng)常會(huì)遇到向終端下發(fā)命令,如果命令一段時(shí)間沒(méi)有應(yīng)答,就需要設(shè)置成超時(shí)。

場(chǎng)景二:訂單下單之后30分鐘后,如果用戶(hù)沒(méi)有付錢(qián),則系統(tǒng)自動(dòng)取消訂單。

上述類(lèi)似的需求是我們經(jīng)常會(huì)遇見(jiàn)的問(wèn)題。最常用的方法是定期輪訓(xùn)數(shù)據(jù)庫(kù),設(shè)置狀態(tài)。在數(shù)據(jù)量小的時(shí)候并沒(méi)有什么大的問(wèn)題,但是數(shù)據(jù)量一大輪訓(xùn)數(shù)據(jù)庫(kù)的方式就會(huì)變得特別耗資源。當(dāng)面對(duì)千萬(wàn)級(jí)、上億級(jí)數(shù)據(jù)量時(shí),本身寫(xiě)入的IO就比較高,導(dǎo)致長(zhǎng)時(shí)間查詢(xún)或者根本就查不出來(lái),更別說(shuō)分庫(kù)分表以后了。除此之外,還有優(yōu)先級(jí)隊(duì)列,基于優(yōu)先級(jí)隊(duì)列的JDK延遲隊(duì)列,時(shí)間輪等方式。但如果系統(tǒng)的架構(gòu)中本身就有RabbitMQ的話(huà),那么選擇RabbitMQ來(lái)實(shí)現(xiàn)類(lèi)似的功能也是一種選擇。

使用RabbitMQ來(lái)實(shí)現(xiàn)延遲任務(wù)必須先了解RabbitMQ的兩個(gè)概念:消息的TTL和死信Exchange,通過(guò)這兩者的組合來(lái)實(shí)現(xiàn)上述需求。

消息的TTL(Time To Live)

消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱(chēng)之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。

可以通過(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫(xiě)個(gè)int類(lèi)型的字符串:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

當(dāng)上面的消息扔到隊(duì)列中后,過(guò)了60秒,如果沒(méi)有被消費(fèi),它就死了。不會(huì)被消費(fèi)者消費(fèi)到。這個(gè)消息后面的,沒(méi)有“死掉”的消息對(duì)頂上來(lái),被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會(huì)被刪除和釋放,它會(huì)被統(tǒng)計(jì)到隊(duì)列的消息數(shù)中去。單靠死信還不能實(shí)現(xiàn)延遲任務(wù),還要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在這里就不在贅述,一個(gè)消息在滿(mǎn)足如下條件下,會(huì)進(jìn)死信路由,記住這里是路由而不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)很多隊(duì)列。

1. 一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。

2. 上面的消息的TTL到了,消息過(guò)期了。

3. 隊(duì)列的長(zhǎng)度限制滿(mǎn)了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。

Dead Letter Exchange其實(shí)就是一種普通的exchange,和創(chuàng)建其他exchange沒(méi)有兩樣。只是在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過(guò)期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去。

實(shí)現(xiàn)延遲隊(duì)列

延遲任務(wù)通過(guò)消息的TTL和Dead Letter Exchange來(lái)實(shí)現(xiàn)。我們需要建立2個(gè)隊(duì)列,一個(gè)用于發(fā)送消息,一個(gè)用于消息過(guò)期后的轉(zhuǎn)發(fā)目標(biāo)隊(duì)列。

生產(chǎn)者輸出消息到Queue1,并且這個(gè)消息是設(shè)置有有效時(shí)間的,比如60s。消息會(huì)在Queue1中等待60s,如果沒(méi)有消費(fèi)者收掉的話(huà),它就是被轉(zhuǎn)發(fā)到Queue2,Queue2有消費(fèi)者,收到,處理延遲任務(wù)。

具體實(shí)現(xiàn)步驟如下:

第一步, 首先需要?jiǎng)?chuàng)建2個(gè)隊(duì)列。Queue1和Queue2。Queue1是一個(gè)消息緩沖隊(duì)列,在這個(gè)隊(duì)列里面實(shí)現(xiàn)消息的過(guò)期轉(zhuǎn)發(fā)。如下圖,設(shè)置Dead letter exchange和Dead letter routing key。設(shè)置這兩個(gè)屬性就是當(dāng)消息在這個(gè)隊(duì)列中expire后,采用哪個(gè)路由發(fā)送。這個(gè)dlx的exchange需要事先創(chuàng)建好,就是一個(gè)普通的exchange。由于我們還需要向Queue1發(fā)送消息,那么還需要?jiǎng)?chuàng)建一個(gè)exchange,并且和Queue1綁定。例子中,exchange同樣取名:queue1。

我們還需要建一個(gè)Queue2,這個(gè)隊(duì)列用于消息在Queue1中過(guò)期后轉(zhuǎn)發(fā)的目標(biāo)隊(duì)列。所以這個(gè)Queue2隊(duì)列建好以后,需要綁定Queue1設(shè)置的死信路由:dlx。完成Queue2的綁定以后,環(huán)境就搭建完成了。

第二步,實(shí)現(xiàn)消息的Producer。由于我們的目的是讓進(jìn)入Queue1的消息過(guò)期,然后自動(dòng)轉(zhuǎn)送到Queue2中,所以發(fā)送的時(shí)候,需要設(shè)置過(guò)期時(shí)間。

ConnectionFactory factory = new ConnectionFactory();
   factory.setUsername("bsp");
   factory.setPassword("123456");
   factory.setVirtualHost("/");
   factory.setHost("10.23.22.42");
   factory.setPort(5672);
   conn = factory.newConnection();
   channel = conn.createChannel();
   byte[] messageBodyBytes = "Hello, world!".getBytes();
   byte i = 10;
   while (i-- > 0) {    
    channel.basicPublish("queue1", "queue1", new AMQP.BasicProperties.Builder().expiration(String.valueOf(i * 1000)).build(),
      new byte[] { i });
   }

上面的代碼我模擬了1-10號(hào)消息,消息的內(nèi)容里面是1-10。過(guò)期的時(shí)間是10-1秒。這里要注意,雖然10是第一個(gè)發(fā)送,但是它過(guò)期的時(shí)間最長(zhǎng)。

第三步,實(shí)現(xiàn)消息的Consumer。Consumer就是延遲任務(wù)的具體實(shí)施者。由于具體的任務(wù)往往是一個(gè)比較耗時(shí)的任務(wù),所以一般來(lái)說(shuō),任務(wù)一般在異步線(xiàn)程中執(zhí)行。

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("bsp");
factory.setPassword("123456");
factory.setVirtualHost("/");
factory.setHost("10.23.22.42");
factory.setPort(5672);
conn = factory.newConnection();
channel = conn.createChannel();
channel.basicConsume("queue2", true, "consumer", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();     
//do some work async
System.out.println(body[0]);
}
});

運(yùn)行后如上面的程序,過(guò)了10s以后,消費(fèi)者開(kāi)始收到數(shù)據(jù),但是它是一次性收到如下結(jié)果:

10、9 、8 、7 、6、5 、4 、3 、2 、1

Consumer第一個(gè)收到的還是10。雖然10是第一個(gè)放進(jìn)隊(duì)列,但是它的過(guò)期時(shí)間最長(zhǎng)。所以由此可見(jiàn),即使一個(gè)消息比在同一隊(duì)列中的其他消息提前過(guò)期,提前過(guò)期的也不會(huì)優(yōu)先進(jìn)入死信隊(duì)列,它們還是按照入庫(kù)的順序讓消費(fèi)者消費(fèi)。如果第一進(jìn)去的消息過(guò)期時(shí)間是1小時(shí),那么死信隊(duì)列的消費(fèi)者也許等1小時(shí)才能收到第一個(gè)消息。參考官方文檔發(fā)現(xiàn)“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當(dāng)過(guò)期的消息到了隊(duì)列的頂端(隊(duì)首),才會(huì)被真正的丟棄或者進(jìn)入死信隊(duì)列。

所以在考慮使用RabbitMQ來(lái)實(shí)現(xiàn)延遲任務(wù)隊(duì)列的時(shí)候,需要確保業(yè)務(wù)上每個(gè)任務(wù)的延遲時(shí)間是一致的。如果遇到不同的任務(wù)類(lèi)型需要不同的延時(shí)的話(huà),需要為每一種不同延遲時(shí)間的消息建立單獨(dú)的消息隊(duì)列。

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • 為密碼文本框要求不可粘帖字符串只可手動(dòng)輸入(附演示動(dòng)畫(huà))

    為密碼文本框要求不可粘帖字符串只可手動(dòng)輸入(附演示動(dòng)畫(huà))

    以前開(kāi)發(fā)程序時(shí),用戶(hù)登錄的密碼文本框,是可以粘帖密碼字符串的,為了安全起見(jiàn)用戶(hù)要求不要粘帖,只能由手動(dòng)輸入,感興趣的朋友可以了解下
    2013-01-01
  • .NET6使用微信小程序授權(quán)登錄獲取手機(jī)號(hào)

    .NET6使用微信小程序授權(quán)登錄獲取手機(jī)號(hào)

    小程序手機(jī)號(hào)授權(quán)是在里打開(kāi)小程序時(shí)彈窗請(qǐng)求允許使用某些功能,比如授權(quán)獲取用戶(hù)信息、授權(quán)獲取手機(jī)號(hào)等,本文主要介紹了.NET6使用微信小程序授權(quán)登錄獲取手機(jī)號(hào),感興趣的可以了解一下
    2023-08-08
  • .net中as和is之間的區(qū)別分析

    .net中as和is之間的區(qū)別分析

    .net中as和is之間的區(qū)別分析,需要的朋友可以參考一下
    2013-05-05
  • 兩種獲取connectionString的方式案例詳解

    兩種獲取connectionString的方式案例詳解

    這篇文章主要介紹了兩種獲取connectionString的方式案例詳解,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • ASP.NET Core 3.x 并發(fā)限制的實(shí)現(xiàn)代碼

    ASP.NET Core 3.x 并發(fā)限制的實(shí)現(xiàn)代碼

    這篇文章主要介紹了ASP.NET Core 3.x 并發(fā)限制的實(shí)現(xiàn)代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • ASP.NET MVC中異常處理&自定義錯(cuò)誤頁(yè)詳析

    ASP.NET MVC中異常處理&自定義錯(cuò)誤頁(yè)詳析

    當(dāng)ASP.NET MVC程序出現(xiàn)了異常,怎么處理更加規(guī)范?下面這篇文章主要給大家介紹了關(guān)于ASP.NET MVC中異常處理&自定義錯(cuò)誤頁(yè)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-04-04
  • asp.net Repeater 自增

    asp.net Repeater 自增

    asp.net Repeater 自增加實(shí)現(xiàn)代碼。
    2009-06-06
  • ASP.NET Core使用JWT自定義角色并實(shí)現(xiàn)策略授權(quán)需要的接口

    ASP.NET Core使用JWT自定義角色并實(shí)現(xiàn)策略授權(quán)需要的接口

    這篇文章介紹了ASP.NET Core使用JWT自定義角色并實(shí)現(xiàn)策略授權(quán)需要的接口,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-01-01
  • ASP.NET 定制簡(jiǎn)單的錯(cuò)誤處理頁(yè)面實(shí)現(xiàn)代碼

    ASP.NET 定制簡(jiǎn)單的錯(cuò)誤處理頁(yè)面實(shí)現(xiàn)代碼

    通常web應(yīng)用程序在發(fā)布后,為了給用戶(hù)一個(gè)友好界面和使用體驗(yàn),都會(huì)在錯(cuò)誤發(fā)生時(shí)跳轉(zhuǎn)至一個(gè)自定義的錯(cuò)誤頁(yè)面,而不是asp.net向用戶(hù)暴露出來(lái)的詳細(xì)的異常列表。
    2010-01-01
  • asp.net(c#)ref,out ,params的區(qū)別

    asp.net(c#)ref,out ,params的區(qū)別

    C#中有三個(gè)關(guān)鍵字-ref,out ,params,雖然本人不喜歡這三個(gè)關(guān)鍵字,因?yàn)樗鼈円伤破茐拿嫦驅(qū)ο筇匦?。但是既然m$把融入在c#體系中,那么我們就來(lái)認(rèn)識(shí)一下參數(shù)修飾符ref,out ,params吧,還有它們的區(qū)別。
    2009-12-12

最新評(píng)論