亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java實(shí)現(xiàn)異步延遲隊(duì)列的方法詳解

 更新時(shí)間:2023年03月22日 09:46:57   作者:京東云開發(fā)者  
目前系統(tǒng)中有很多需要用到延時(shí)處理的功能,本文就為大家介紹了Java實(shí)現(xiàn)異步延遲隊(duì)列的方法,文中的示例代碼講解詳細(xì),需要的可以參考一下

1.應(yīng)用場(chǎng)景

目前系統(tǒng)中有很多需要用到延時(shí)處理的功能:支付超時(shí)取消、排隊(duì)超時(shí)、短信、微信等提醒延遲發(fā)送、token刷新、會(huì)員卡過期等等。通過延時(shí)處理,極大的節(jié)省系統(tǒng)的資源,不必輪詢數(shù)據(jù)庫處理任務(wù)。

目前大部分功能通過定時(shí)任務(wù)完成,定時(shí)任務(wù)還分使用quartz及xxljob兩種類型輪詢時(shí)間短,每秒執(zhí)行一次,對(duì)數(shù)據(jù)庫造成一定的壓力,并且會(huì)有1秒的誤差。輪詢時(shí)間久,如30分鐘一次,03:01插入一條數(shù)據(jù),正常3:31執(zhí)行過期,但是3:30執(zhí)行輪詢時(shí),掃描3:00-3:30的數(shù)據(jù),是掃描不到3:31的數(shù)據(jù)的,需要4:00的時(shí)候才能掃描到,相當(dāng)于多延遲了29分鐘!

2.延時(shí)處理方式調(diào)研

1.DelayQueue

1.實(shí)現(xiàn)方式:

jvm提供的延遲阻塞隊(duì)列,通過優(yōu)先級(jí)隊(duì)列對(duì)不同延遲時(shí)間任務(wù)進(jìn)行排序,通過condition進(jìn)行阻塞、睡眠dealy時(shí)間 獲取延遲任務(wù)。

當(dāng)有新任務(wù)加入時(shí),會(huì)判斷新任務(wù)是否是第一個(gè)待執(zhí)行的任務(wù),若是,會(huì)解除隊(duì)列睡眠,防止新加入的元素時(shí)需要執(zhí)行的元素而不能正常被執(zhí)行線程獲取到。

2.存在的問題:

1.單機(jī)運(yùn)行,系統(tǒng)宕機(jī)后,無法進(jìn)行有效的重試

2.沒有執(zhí)行記錄和備份

3.沒有重試機(jī)制

4.系統(tǒng)重啟時(shí),會(huì)將任務(wù)清空!

5.不能分片消費(fèi)

3.優(yōu)勢(shì):實(shí)現(xiàn)簡(jiǎn)單,無任務(wù)時(shí)阻塞,節(jié)省資源,執(zhí)行時(shí)間準(zhǔn)確

2.延遲隊(duì)列mq

實(shí)現(xiàn)方式:依賴mq,通過設(shè)置延遲消費(fèi)時(shí)間,達(dá)到延遲消費(fèi)功能。像rabbitMq、jmq都可以設(shè)置延遲消費(fèi)時(shí)間。RabbitMq通過將消息設(shè)置過期時(shí)間,放入死信隊(duì)列進(jìn)行消費(fèi)實(shí)現(xiàn)。

存在的問題:

1.時(shí)間設(shè)置不靈活,每個(gè)queue是固定的到期時(shí)間,每次新創(chuàng)建延時(shí)隊(duì)列,需要?jiǎng)?chuàng)建新的消息隊(duì)列

優(yōu)點(diǎn):依靠jmq,可以有效的監(jiān)控、消費(fèi)記錄、重試,具備多機(jī)同時(shí)消費(fèi)能力,不懼怕宕機(jī)

3.定時(shí)任務(wù)

通過定時(shí)任務(wù)輪詢符合條件的數(shù)據(jù)

缺點(diǎn):

1.必須要讀業(yè)務(wù)數(shù)據(jù)庫,對(duì)數(shù)據(jù)庫造成一定的壓力,

2.存在延時(shí)

3.一次掃描數(shù)據(jù)量過大時(shí),占用過多的系統(tǒng)資源。

4. 無法分片消費(fèi)

優(yōu)點(diǎn):

1.消費(fèi)失敗后,下次還能繼續(xù)消費(fèi),具備重試能力,

2.消費(fèi)能力穩(wěn)定

4.redis

任務(wù)存儲(chǔ)在redis中,使用redis的 zset隊(duì)列根據(jù)score進(jìn)行排序,程序通過線程不斷獲取隊(duì)列數(shù)據(jù)消費(fèi),實(shí)現(xiàn)延時(shí)隊(duì)列

優(yōu)點(diǎn):

1、查詢r(jià)edis相比較數(shù)據(jù)庫快,set隊(duì)列長(zhǎng)度過大,會(huì)根據(jù)跳表結(jié)構(gòu)進(jìn)行查詢,效率高

2、redis可根據(jù)時(shí)間戳進(jìn)行排序,只需要查詢當(dāng)前時(shí)間戳內(nèi)的分?jǐn)?shù)的任務(wù)即可

3、無懼機(jī)器重啟

4、分布式消費(fèi)

缺點(diǎn):

1.受限于redis性能,并發(fā)10W

2.多個(gè)命令無法保證原子性,使用lua腳本會(huì)要求所有數(shù)據(jù)都在一個(gè)redis分片上。

5. 時(shí)間輪

通過時(shí)間輪實(shí)現(xiàn)的延遲任務(wù)執(zhí)行,也是基于jvm單機(jī)運(yùn)行,如kafka、netty都有實(shí)現(xiàn)時(shí)間輪,redisson的看門狗也是通過netty的時(shí)間輪實(shí)現(xiàn)的。

缺點(diǎn):不適合分布式服務(wù)的使用,宕機(jī)后,會(huì)丟失任務(wù)。

3.實(shí)現(xiàn)目標(biāo)

兼容目前在使用的異步事件組件,并提供更可靠,可重試、有記錄、可監(jiān)控報(bào)警、高性能的延遲組件。

•消息傳輸可靠性:消息進(jìn)入到延遲隊(duì)列后,保證至少被消費(fèi)一次。

•Client支持豐富:支持多重語言。

•高可用性:支持多實(shí)例部署。掛掉一個(gè)實(shí)例后,還有后備實(shí)例繼續(xù)提供服務(wù)。

•實(shí)時(shí)性:允許存在一定的時(shí)間誤差。

•支持消息刪除:業(yè)務(wù)使用方,可以隨時(shí)刪除指定消息。

•支持消費(fèi)查詢

•支持手動(dòng)重試

•對(duì)當(dāng)前異步事件的執(zhí)行增加監(jiān)控

4.架構(gòu)設(shè)計(jì)

5.延遲組件實(shí)現(xiàn)方式

1.實(shí)現(xiàn)原理

目前選擇使用jimdb通過zset實(shí)現(xiàn)延時(shí)功能,將任務(wù)id和對(duì)應(yīng)的執(zhí)行時(shí)間作為score存在在zset隊(duì)列中,默認(rèn)會(huì)按照score排序,每次取0-當(dāng)前時(shí)間內(nèi)的score的任務(wù)id,

發(fā)送延遲任務(wù)時(shí),會(huì)根據(jù)時(shí)間戳+機(jī)器ip+queueName+sequence 生成唯一的id,構(gòu)造消息體,加密后放入zset隊(duì)列中。

通過搬運(yùn)線程,將達(dá)到執(zhí)行時(shí)間的任務(wù)移動(dòng)到發(fā)布隊(duì)列中,等待消費(fèi)者獲取。

監(jiān)控方通過集成ump

消費(fèi)記錄通過redis備份+數(shù)據(jù)庫持久化完成。

通過緩存實(shí)現(xiàn)的方式,只是實(shí)現(xiàn)的一種,可以通過參數(shù)控制使用哪一種實(shí)現(xiàn)方式,并可通過spi自由擴(kuò)展。

2.消息結(jié)構(gòu)

每個(gè)Job必須包含一下幾個(gè)屬性:

•Topic:Job類型,即QueueName

•Id:Job的唯一標(biāo)識(shí)。用來檢索和刪除指定的Job信息。

•Delay:Job需要延遲的時(shí)間。單位:秒。(服務(wù)端會(huì)將其轉(zhuǎn)換為絕對(duì)時(shí)間)

•Body:Job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲(chǔ)。

•traceId:發(fā)送線程的traceId,待后續(xù)pfinder支持設(shè)置traceId后,可與發(fā)送線程公用同一個(gè)traceiD,便于日志追蹤

具體結(jié)構(gòu)如下圖表示:

TTR的設(shè)計(jì)目的是為了保證消息傳輸?shù)目煽啃浴?/p>

3.數(shù)據(jù)流轉(zhuǎn)及流程圖

基于redis-disruptor方式進(jìn)行發(fā)布、消費(fèi),可以作為消息來進(jìn)行使用,消費(fèi)者采用原有異步事件的disruptor無鎖隊(duì)列消費(fèi),不同應(yīng)用、不同queue之間無鎖

1.支持應(yīng)用只發(fā)布,不消費(fèi),達(dá)到消息隊(duì)列的功能。

2:支持分桶,針對(duì)大key問題,若事件多,可以設(shè)置延遲隊(duì)列和任務(wù)隊(duì)列桶的數(shù)量,減小因大key造成的redis阻塞問題。

3: 通過ducc配置,進(jìn)行性能的擴(kuò)展,目前只支持開啟消費(fèi)和關(guān)閉消費(fèi)。

4: 支持設(shè)置超時(shí)時(shí)間配置,防止消費(fèi)線程執(zhí)行過久

瓶頸: 消費(fèi)速度慢,生產(chǎn)速度過快,會(huì)導(dǎo)致ringbuffer隊(duì)列占滿,當(dāng)前應(yīng)用既是生產(chǎn)者也是消費(fèi)者時(shí),生產(chǎn)者會(huì)休眠,性能取決于消費(fèi)速度,可通過水平擴(kuò)展機(jī)器,直接提升性能。監(jiān)控redis隊(duì)列的長(zhǎng)度,若不斷增長(zhǎng),可考慮增加消費(fèi)者,直接提高性能。

可能出現(xiàn)的情況: 因一個(gè)應(yīng)用公用一個(gè)disruptor,擁有64個(gè)消費(fèi)者線程,如果某一個(gè)事件消費(fèi)過慢,導(dǎo)致64個(gè)線程都在消費(fèi)這個(gè)事件,會(huì)導(dǎo)致其他事件無消費(fèi)線程消費(fèi),生產(chǎn)者線程也被阻塞,導(dǎo)致所有事件的消費(fèi)都被阻塞。

后期觀察是否有這個(gè)性能瓶頸,可給每一個(gè)queue一個(gè)消費(fèi)者線程池。

6.demo示例

增加配置文件

判斷是否開啟jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
 <artifactId>senna-event</artifactId>
 <version>1.0-SNAPSHOT</version> </dependency>?

配置

jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle

消費(fèi)代碼

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費(fèi):{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費(fèi):{}", key);
}
}

注解形式

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費(fèi):{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費(fèi):{}", key);
}
}?

發(fā)送代碼

package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;


/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {

@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;

@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("發(fā)送無延遲消息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("發(fā)送延遲5秒消息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("發(fā)送延遲到2022-04-02 00:00:00執(zhí)行的消息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 

}?

7.目前應(yīng)用

1.云修到店排隊(duì)24小時(shí)后自動(dòng)取消

2..美團(tuán)請(qǐng)求token定時(shí)刷新。

3.質(zhì)??ㄑ悠?4小時(shí)生成

5. 結(jié)算單延期生成

6.短信延遲發(fā)送

到此這篇關(guān)于Java實(shí)現(xiàn)異步延遲隊(duì)列的方法詳解的文章就介紹到這了,更多相關(guān)Java異步延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java服務(wù)假死之生產(chǎn)事故的排查與優(yōu)化問題

    Java服務(wù)假死之生產(chǎn)事故的排查與優(yōu)化問題

    在服務(wù)器上通過curl命令調(diào)用一個(gè)Java服務(wù)的查詢接口,半天沒有任何響應(yīng),怎么進(jìn)行這一現(xiàn)象排查呢,下面小編給大家記一次生產(chǎn)事故的排查與優(yōu)化——Java服務(wù)假死問題,感興趣的朋友一起看看吧
    2022-07-07
  • Mybatis返回Map數(shù)據(jù)方式示例

    Mybatis返回Map數(shù)據(jù)方式示例

    這篇文章主要為大家介紹了Mybatis返回Map數(shù)據(jù)方式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06
  • Spring中的ApplicationContext與BeanFactory詳解

    Spring中的ApplicationContext與BeanFactory詳解

    這篇文章主要介紹了Spring中的ApplicationContext與BeanFactory詳解,Spring的IoC容器就是一個(gè)實(shí)現(xiàn)了BeanFactory接口的可實(shí)例化類,事實(shí)上, Spring提供了兩種不同的容器,一種是最基本的BeanFactory,另一種是擴(kuò)展的ApplicationContext,需要的朋友可以參考下
    2024-01-01
  • JavaWeb實(shí)現(xiàn)文件上傳與下載的方法

    JavaWeb實(shí)現(xiàn)文件上傳與下載的方法

    這篇文章主要介紹了JavaWeb實(shí)現(xiàn)文件上傳與下載的方法的相關(guān)資料,需要的朋友可以參考下
    2016-01-01
  • Sentinel流控規(guī)則實(shí)現(xiàn)限流保護(hù)詳解

    Sentinel流控規(guī)則實(shí)現(xiàn)限流保護(hù)詳解

    這篇文章主要介紹了Sentinel流控規(guī)則實(shí)現(xiàn)限流保護(hù),Sentinel是一個(gè)分布式系統(tǒng)的流量控制組件,它可以實(shí)現(xiàn)限流,流控,降級(jí)等功能,提高系統(tǒng)的穩(wěn)定性和可靠性,感興趣想要詳細(xì)了解可以參考下文
    2023-05-05
  • java 四舍五入使java保留2位小數(shù)示例講解

    java 四舍五入使java保留2位小數(shù)示例講解

    這篇文章主要介紹了java四舍五入使java保留2位小數(shù)示例,大家參考使用
    2013-12-12
  • 一文詳解Java中枚舉類的使用

    一文詳解Java中枚舉類的使用

    這篇文章主要介紹了深入淺出講解Java中的枚舉類,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,感興趣的朋友可以了解下
    2022-11-11
  • IntelliJ IDEA JRebel 安裝使用圖文教程(熱部署插件)

    IntelliJ IDEA JRebel 安裝使用圖文教程(熱部署插件)

    IDEA 全稱 IntelliJ IDEA,是java語言開發(fā)的集成環(huán)境,IntelliJ在業(yè)界被公認(rèn)為最好的java開發(fā)工具之一。這篇文章主要介紹了IntelliJ IDEA 熱部署插件JRebel 安裝使用圖文教程,需要的朋友可以參考下
    2018-03-03
  • Springboot返回的json屏蔽某些屬性的操作

    Springboot返回的json屏蔽某些屬性的操作

    這篇文章主要介紹了Springboot返回的json屏蔽某些屬性的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • springboot脫敏控件升級(jí)支持深度脫敏

    springboot脫敏控件升級(jí)支持深度脫敏

    這篇文章主要為大家介紹了springboot脫敏控件升級(jí)支持深度脫敏,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-07-07

最新評(píng)論