亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot實(shí)現(xiàn)redis延遲隊(duì)列的示例代碼

 更新時(shí)間:2024年02月28日 09:29:10   作者:RachelHwang  
延時(shí)隊(duì)列場景在我們?nèi)粘I(yè)務(wù)開發(fā)中經(jīng)常遇到,它是一種特殊類型的消息隊(duì)列,本文就來介紹一下SpringBoot實(shí)現(xiàn)redis延遲隊(duì)列的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下

1. 業(yè)務(wù)場景

延時(shí)隊(duì)列場景在我們?nèi)粘I(yè)務(wù)開發(fā)中經(jīng)常遇到,它是一種特殊類型的消息隊(duì)列,它允許把消息發(fā)送到隊(duì)列中,但不立即投遞給消費(fèi)者,而是在一定時(shí)間后再將消息投遞給消費(fèi)者。延遲隊(duì)列的常見使用場景有以下幾種:

  • 在各種購物平臺上下單,訂單超過30分鐘未支付,自動關(guān)閉。
  • 訂單完成后, 如果用戶一直未評價(jià), 5天后自動好評。
  • 會員到期前15天, 到期前3天分別發(fā)送短信提醒。
  • 當(dāng)訂單一直處于未支付狀態(tài)時(shí),如何及時(shí)的關(guān)閉訂單,并退還庫存?
  • 如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功?

2. Redis延遲隊(duì)列實(shí)現(xiàn)原理

目前延遲隊(duì)列的類型主要實(shí)現(xiàn)有:

  • 基于消息的延遲:指為每條消息設(shè)置不同的延遲時(shí)間,那么每當(dāng)隊(duì)列中有新消息進(jìn)入的時(shí)候就會重新根據(jù)延遲時(shí)間排序,或者定義時(shí)間輪,新消息落在指定位置;
  • 基于隊(duì)列的延遲: 設(shè)置不同延遲級別的隊(duì)列,比如5s、1min、30mins、1h等,每個(gè)隊(duì)列中消息的延遲時(shí)間都是相同的。

基于第一種不少組件都有實(shí)現(xiàn)方案,比如redis的sortset間接實(shí)現(xiàn),kafka內(nèi)部時(shí)間輪,rabbitMQ可安裝插件實(shí)現(xiàn)。第一種實(shí)時(shí)性高,不過主觀看會比較依賴組件本身,但自己實(shí)現(xiàn)就得考慮持久化、高可用等問題,建議直接使用組件本身;第二種方案可以基于組件去實(shí)現(xiàn),通用性會高點(diǎn),不過實(shí)時(shí)性不高,更適合用于重試業(yè)務(wù)場景。當(dāng)然Redis本身并不支持延遲隊(duì)列,所以我們只是實(shí)現(xiàn)一個(gè)比較簡單的延遲隊(duì)列,而且Redis不太適合大量消息堆積,所以只適合比較簡單的場景,然假如我們對消息的實(shí)時(shí)性以及可靠性要求非常高,可能就需要使用MQ或kafka來實(shí)現(xiàn)了。

消息延遲流程圖如下:

在這里插入圖片描述

Redis延遲隊(duì)列可以通過 zset 來實(shí)現(xiàn),因?yàn)?zset 中有一個(gè) score,我們可以把時(shí)間作為 score,將 value 存到 redis 中,然后通過輪詢的方式,去不斷的讀取消息出來,整體思路為:

  • 消息體設(shè)置有效期,設(shè)置好score,然后放入zset中
  • 通過排名拉取消息
  • 有效期到了,就把當(dāng)前消息從zset中移除

zadd命令

使用方式:ZADD key score member [[score member][score member] …]
將一個(gè)或多個(gè) member 元素及其 score 值加入到有序集 key 當(dāng)中。如果 key 不存在,則創(chuàng)建一個(gè)空的有序集并執(zhí)行 ZADD 操作。如果某個(gè) member 已經(jīng)是有序集的成員,那么更新這個(gè) member 的 score 值,并通過重新插入這個(gè) member 元素,來保證該 member 在正確的位置上。score 值可以是整數(shù)值或雙精度浮點(diǎn)數(shù)。

ZRANGEBYSCORE命令

使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

  • 返回有序集 key 中,所有 score 值介于 min 和 max 之間(包括等于 min 或 max )的成員。有序集成員按 score 值遞增(從小到大)次序排列。
  • 具有相同 score 值的成員按字典序來排列
  • 可選的 LIMIT 參數(shù)指定返回結(jié)果的數(shù)量及區(qū)間(就像SQL中的 SELECT LIMIT offset, count ),注意當(dāng) offset 很大時(shí),定位 offset 的操作可能需要遍歷整個(gè)有序集,此過程最壞復(fù)雜度為 O(N) 時(shí)間。
  • 可選的 WITHSCORES 參數(shù)決定結(jié)果集是單單返回有序集的成員,還是將有序集成員及其 score 值一起返回。

ZREM命令

使用方式:ZREM key member [member …]
移除有序集 key 中的一個(gè)或多個(gè)成員,不存在的成員將被忽略。
當(dāng) key 存在但不是有序集類型時(shí),返回一個(gè)錯(cuò)誤。

3. 基于springboot實(shí)現(xiàn)redis延遲隊(duì)列

3.1 引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>${version}</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>${version}</version>
</dependency>

3.2 redis基礎(chǔ)方法

定義RedisService基礎(chǔ)服務(wù)方法,本次案例只涉及到以下三個(gè)基礎(chǔ)方法:

    /**
     * 添加 ZSet 元素
     *
     * @param key
     * @param value
     * @param score
     */
    @Override
    public boolean add(String key, Object value, double score) {
        return redisTemplate.opsForZSet().add(key, value, score);
    }
    
    /**
     * 返回 分?jǐn)?shù)范圍內(nèi) 指定 count 數(shù)量的元素集合, 并且從 offset 下標(biāo)開始(從小到大,不帶分?jǐn)?shù)的集合)
     *
     * @param key
     * @param min
     * @param max
     * @param offset 從指定下標(biāo)開始
     * @param count  輸出指定元素?cái)?shù)量
     * @return
     */
    @Override
    public Set<Object> rangeByScore(String key, double min, double max, long offset, long count) {
        return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);
    }

    /**
     * Zset 刪除一個(gè)或多個(gè)元素
     *
     * @param key
     * @param values
     * @return
     */
    @Override
    public Long removeZset(String key, Object... values) {
        return redisTemplate.opsForZSet().remove(key, values);
    }

3.3 定義Spring消息事件推送

@Getter
@ToString
public class DelayMsg extends ApplicationEvent {
    private String msg;
    private String topic;

    public DelayMsg(Object source, String msg, String topic) {
        super(source);
        this.msg = msg;
        this.topic = topic;
    }
}

3.4 消息獲取

定義redis獲取延遲隊(duì)列消息方法:

/**
 * 從zset中取出score小于當(dāng)前時(shí)間戳的數(shù)據(jù)
 *
 * @param key
 * @return
 */
public String getDelayOne(String key) {
    //先查后刪,一次拿3個(gè)做備選,這樣搶占到的概率就會高一些
    Set<Object> sets = redisService.rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);
    if (CollectionUtils.isEmpty(sets)) {
        return null;
    }

    for (Object val : sets) {
        if (1L.equals(redisService.removeZset(key, val))) {
            // 刪除成功,表示搶占到
            return val.toString();
        }
    }
    return null;
}

這里每次查詢時(shí)取了三個(gè)數(shù)據(jù),然后遍歷獲取到的數(shù)據(jù),依次嘗試去刪除,若刪除成功,則表示當(dāng)前實(shí)例搶占到了這個(gè)消息

  • 為什么這樣設(shè)計(jì)? 這里有兩個(gè)點(diǎn),先解釋第一個(gè),為啥先查后刪

如果我們按照正常的實(shí)現(xiàn)流程,每次從zset中取一個(gè),但是無法保證這個(gè)時(shí)候就只有我一個(gè)人拿到了這個(gè)數(shù)據(jù),在多實(shí)例的場景下,可能存在多個(gè)實(shí)例同時(shí)拿到了它,那么如何才能表示只有一個(gè)實(shí)例搶占到呢?

借助redis的單線程機(jī)制,只可能有一個(gè)實(shí)例會刪除成功,所以拿到并刪除成功的那個(gè)小伙伴,就是最終的幸運(yùn)兒;

因此實(shí)現(xiàn)細(xì)節(jié)就是先查,后刪,若刪除成功,表示獲取成功;否則表示被其他的實(shí)例捷足先登。

  • 接下來再看第二個(gè),為啥一次拿三個(gè)

從上面的分析可以看出,如果我一次只拿一個(gè),那么我搶占到的幾率并不太大,特別是當(dāng)實(shí)例比較多時(shí),可能會做多次的無效操作;為了減少這個(gè)可能性,所以我一次多拿幾個(gè)做備選,這樣搶占到的概率就會高一些,至于為什么是3,這個(gè)就看實(shí)際的實(shí)例與定時(shí)任務(wù)的執(zhí)行間隔了。

上面定義了如何獲取延遲隊(duì)列中已到期的消息,接下來需要定時(shí)輪訓(xùn)獲取消息:

/**
 * 每5s定時(shí)輪訓(xùn)消息
 */
@Scheduled(fixedRate = 5000)
public void schedule() {
   for (String specialTopic : topic) {
       String msg = redisDelayQueue.getDelayOne(specialTopic);
       logger.info("開始輪訓(xùn)獲取消息 {}", msg);
       if (StringUtil.isNotEmpty(msg)) {
       	   //使用Spring推送事件處理
           applicationContext.publishEvent(new DelayMsg(this, msg, specialTopic));
       }
   }
}

上面的定時(shí)任務(wù),直接借助Spring的@Schedule來實(shí)現(xiàn),遍歷所有的topic,撈出數(shù)據(jù)之后,通過spring的 event/listener事件機(jī)制來實(shí)現(xiàn)消息處理的解耦

3.5 定義消費(fèi)者注解和切面處理

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {
    String topic();
}

注意這個(gè)注解上面還有 @EventListener,表明它可以監(jiān)聽的spring的事件

3.6 定義延時(shí)業(yè)務(wù)的切面處理

@Aspect
@Component
public class ConsumerAspect {

    @Around("@annotation(consumer)")
    public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {
        Object[] args = joinPoint.getArgs();
        boolean check = false;
        for (Object obj : args) {
            if (obj instanceof DelayMsg) {
                check = consumer.topic().equals(((DelayMsg) obj).getTopic());
            }
        }
        if (!check) {
            // 不滿足條件,直接忽略
            return null;
        }
        // topic匹配成功,執(zhí)行
        return joinPoint.proceed();
    }
}

3.7 消息監(jiān)聽

	//使用自定義的consumer注解監(jiān)聽topic延遲隊(duì)列
    @Consumer(topic = RedisKeyConstant.DELAY_QUEUE)
    public void consumer(DelayMsg delayMsg) {
        logger.info("預(yù)約單延時(shí)確認(rèn): " + delayMsg.getMsg() + " at:" + System.currentTimeMillis());
        //延遲業(yè)務(wù)具體實(shí)現(xiàn)
        //...
        //...
    }

3.8 寫入隊(duì)列的包裝服務(wù)類

@Component
public class DelayListWrapper {
    private Logger logger = LoggerFactory.getLogger(RedisDelayQueue.class);
    @Autowired
    RedisService redisService;

    private Set&lt;String&gt; topic = new CopyOnWriteArraySet&lt;&gt;();

    /**
     *
     * @param key 隊(duì)列名稱
     * @param val 消息內(nèi)容
     * @param delayTime 過期時(shí)間
     */
    public void publish(String key, Object val, long delayTime) {
        topic.add(key);
        String strVal = val instanceof String ? (String) val : JSONObject.toJSONString(val);
        redisService.add(key, strVal, System.currentTimeMillis() + delayTime);
        logger.info("key為:{},time:{}", key,System.currentTimeMillis() + delayTime);
    }
}

3.9 業(yè)務(wù)facade層調(diào)用延遲處理

經(jīng)過以上的延遲隊(duì)列封裝處理,在facade層,也就是我們的業(yè)務(wù)中就可以直接調(diào)用:

@Autowired
private DelayListWrapper delayListWrapper;
...
delayListWrapper.publish(RedisKeyConstant.DELAY_QUEUE, xxxId, xxx);

4 總結(jié)

本文以redis的zset來實(shí)現(xiàn)延時(shí)隊(duì)列,并基于SpringBoot實(shí)現(xiàn)了延遲隊(duì)列的推送和消費(fèi)。更多相關(guān)SpringBoot redis延遲隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • IDEA未配置SQL方言:無法使用SQL提示解決方法

    IDEA未配置SQL方言:無法使用SQL提示解決方法

    在使用IDEA進(jìn)行SQL開發(fā)時(shí),如果未配置SQL方言可能會導(dǎo)致一些問題,如無法正確識別數(shù)據(jù)庫中的關(guān)鍵字、數(shù)據(jù)類型等,這篇文章主要給大家介紹了關(guān)于IDEA未配置SQL方言,無法使用SQL提示解決方法的相關(guān)資料,需要的朋友可以參考下
    2024-07-07
  • 淺談Java泛型讓聲明方法返回子類型的方法

    淺談Java泛型讓聲明方法返回子類型的方法

    下面小編就為大家?guī)硪黄獪\談Java泛型讓聲明方法返回子類型的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-02-02
  • Java注解使用及原理解析

    Java注解使用及原理解析

    這篇文章主要介紹了Java注解使用及原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • 基于Ajax用戶名驗(yàn)證、服務(wù)條款加載、驗(yàn)證碼生成的實(shí)現(xiàn)方法

    基于Ajax用戶名驗(yàn)證、服務(wù)條款加載、驗(yàn)證碼生成的實(shí)現(xiàn)方法

    本篇文章對Ajax用戶名驗(yàn)證、服務(wù)條款加載、驗(yàn)證碼生成的實(shí)現(xiàn)方法,進(jìn)行了詳細(xì)的分析介紹。需要的朋友參考下
    2013-05-05
  • SpringCloud筆記(Hoxton)Netflix之Ribbon負(fù)載均衡示例代碼

    SpringCloud筆記(Hoxton)Netflix之Ribbon負(fù)載均衡示例代碼

    這篇文章主要介紹了SpringCloud筆記HoxtonNetflix之Ribbon負(fù)載均衡,Ribbon是管理HTTP和TCP服務(wù)客戶端的負(fù)載均衡器,Ribbon具有一系列帶有名稱的客戶端(Named?Client),對SpringCloud?Ribbon負(fù)載均衡相關(guān)知識感興趣的朋友一起看看吧
    2022-06-06
  • java中volatile關(guān)鍵字的作用詳解

    java中volatile關(guān)鍵字的作用詳解

    這篇文章主要介紹了java中volatile關(guān)鍵字的作用詳解,volatile可以保證,若一個(gè)線程改變了某塊內(nèi)存的值,其他線程是可見的,以至于其他線程能及時(shí)更新這塊內(nèi)存,需要的朋友可以參考下
    2023-09-09
  • 利用ScriptEngineManager實(shí)現(xiàn)字符串公式靈活計(jì)算的方法

    利用ScriptEngineManager實(shí)現(xiàn)字符串公式靈活計(jì)算的方法

    今天小編就為大家分享一篇利用ScriptEngineManager實(shí)現(xiàn)字符串公式靈活計(jì)算的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-07-07
  • Java獲取客戶端真實(shí)IP地址過程解析

    Java獲取客戶端真實(shí)IP地址過程解析

    這篇文章主要介紹了Java獲取客戶端真實(shí)IP地址過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-01-01
  • springmvc json類型轉(zhuǎn)換錯(cuò)誤解決方案

    springmvc json類型轉(zhuǎn)換錯(cuò)誤解決方案

    這篇文章主要介紹了springmvc json類型轉(zhuǎn)換錯(cuò)誤解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • Spring Boot實(shí)現(xiàn)郵件發(fā)送功能

    Spring Boot實(shí)現(xiàn)郵件發(fā)送功能

    這篇文章主要為大家詳細(xì)介紹了Spring Boot實(shí)現(xiàn)郵件發(fā)送功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-06-06

最新評論