亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RocketMQ延遲消息簡明介紹

 更新時(shí)間:2022年08月22日 17:11:22   作者:π大星的日常  
這篇文章主要介紹了RocketMQ延遲消息,延遲消息是個(gè)啥?顧名思義,就是等一段時(shí)間再消費(fèi)的消息。文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

前言

場景可以是這樣的,雙11搶手機(jī),一個(gè)新手機(jī)4000-5000,到0點(diǎn)的時(shí)候,沖著興奮勁,搶到了。但是摸了摸錢包,又冷靜下來了,好像不是很必要換手機(jī)。就放在那里沒有支付,過了30分鐘,自動(dòng)取消了。這里就是使用延遲消息的場景,當(dāng)下單之后,向消息隊(duì)列發(fā)送一條延遲30分鐘消費(fèi)的消息。等到30分鐘過了,然后消費(fèi)消息,執(zhí)行檢查任務(wù),要是對應(yīng)的訂單支付了,就什么都不做,要是沒支付,就取消訂單。

RocketMQ的延遲消息是org.apache.rocketmq.broker.schedule.ScheduleMessageService類實(shí)現(xiàn)的

核心屬性

RMQ_SYS_SCHEDULE_TOPIC

在之前的版本中叫SCHEDULE_TOPIC,是系統(tǒng)內(nèi)置的Topic,用來保存所有的定時(shí)消息。沒有執(zhí)行的定時(shí)消息都會(huì)被保存在這個(gè)topic中。

FIRST_DELAY_TIME

第一次執(zhí)行定時(shí)任務(wù)的延遲時(shí)間,默認(rèn)是1秒。

private static final long FIRST_DELAY_TIME = 1000L;

DELAY_FOR_A_WHILE

第二次以及之后每次定時(shí)任務(wù)執(zhí)行的間隔時(shí)間,默認(rèn)100ms。

private static final long DELAY_FOR_A_WHILE = 100L;

DELAY_FOR_A_PERIOD

若是延遲消息投遞失敗,則在這個(gè)時(shí)間過后繼續(xù)投遞,默認(rèn)10秒。

private static final long DELAY_FOR_A_PERIOD = 10000L;

delayLevelTable

這是保存延遲級別和延遲時(shí)間映射關(guān)系的地方

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
    new ConcurrentHashMap<Integer, Long>(32);

offsetTable

保存延遲級別和對應(yīng)的消費(fèi)位點(diǎn)

private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
    new ConcurrentHashMap<Integer, Long>(32);

核心方法

queueId2DelayLevel

將queueId轉(zhuǎn)換為延遲級別

public static int queueId2DelayLevel(final int queueId) {
    return queueId + 1;
}

delayLevel2QueueId

將延遲級別轉(zhuǎn)換為queueId

public static int delayLevel2QueueId(final int delayLevel) {
    return delayLevel - 1;
}

updateOffset

更新延遲消息topic的消費(fèi)位點(diǎn)

private void updateOffset(int delayLevel, long offset) {
    this.offsetTable.put(delayLevel, offset);
    if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) {
        long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
        dataVersion.nextVersion(stateMachineVersion);
    }
}

computeDeliverTimestamp

根據(jù)延遲消息級別和消息的存儲(chǔ)時(shí)間計(jì)算該延遲消息的投遞時(shí)間

public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
    Long time = this.delayLevelTable.get(delayLevel);
    if (time != null) {
        return time + storeTimestamp;
    }
    return storeTimestamp + 1000;
}

start()

啟動(dòng)延遲消息服務(wù)

shutdown()

關(guān)閉start方法中啟動(dòng)的額timer任務(wù)

load()

加載消息的消費(fèi)位點(diǎn)信息和全部的延遲級別信息。延遲級別信息默認(rèn)如下。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

parseDelayLevel

格式化所有的延遲級別信息,保存到內(nèi)存中。

到此這篇關(guān)于RocketMQ延遲消息簡明介紹的文章就介紹到這了,更多相關(guān)RocketMQ延遲消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論