亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Redis延遲隊列和分布式延遲隊列的簡答實現(xiàn)

 更新時間:2021年05月13日 11:37:44   作者:黃青石  
在我們的工作中,很多地方使用延遲隊列,比如訂單到期沒有付款取消訂單,制訂一個提醒的任務(wù)等都需要延遲隊列,那么我們需要實現(xiàn)延遲隊列,本文就來介紹一下如何實現(xiàn),感興趣的可以了解一下

        最近,又重新學(xué)習(xí)了下Redis,Redis不僅能快還能慢,簡直利器,今天就為大家介紹一下Redis延遲隊列和分布式延遲隊列的簡單實現(xiàn)。

  在我們的工作中,很多地方使用延遲隊列,比如訂單到期沒有付款取消訂單,制訂一個提醒的任務(wù)等都需要延遲隊列,那么我們需要實現(xiàn)延遲隊列。我們本文的梗概如下,同學(xué)們可以選擇性閱讀。

1. 實現(xiàn)一個簡單的延遲隊列。

  我們知道目前JAVA可以有DelayedQueue,我們首先開一個DelayQueue的結(jié)構(gòu)類圖。DelayQueue實現(xiàn)了Delay、BlockingQueue接口。也就是DelayQueue是一種阻塞隊列。

  我們在看一下Delay的類圖。Delayed接口也實現(xiàn)了Comparable接口,也就是我們使用Delayed的時候需要實現(xiàn)CompareTo方法。因為隊列中的數(shù)據(jù)需要排一下先后,根據(jù)我們自己的實現(xiàn)。Delayed接口里邊有一個方法就是getDelay方法,用于獲取延遲時間,判斷是否時間已經(jīng)到了延遲的時間,如果到了延遲的時間就可以從隊列里邊獲取了。

  我們創(chuàng)建一個Message類,實現(xiàn)了Delayed接口,我們主要把getDelay和compareTo進行實現(xiàn)。在Message的構(gòu)造方法的地方傳入延遲的時間,單位是毫秒,計算好觸發(fā)時間fireTime。同時按照延遲時間的升序進行排序。我重寫了里邊的toString方法,用于將Message按照我寫的方法進行輸出。

package com.hqs.delayQueue.bean;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
public class Message implements Delayed {

    private String body;
    private long fireTime;

    public String getBody() {
        return body;
    }

    public long getFireTime() {
        return fireTime;
    }

    public Message(String body, long delayTime) {
        this.body = body;
        this.fireTime = delayTime + System.currentTimeMillis();
    }

    public long getDelay(TimeUnit unit) {

        return unit.convert(this.fireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return System.currentTimeMillis() + ":" + body;
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println(System.currentTimeMillis() + ":start");
        BlockingQueue<Message> queue = new DelayQueue<>();
        Message message1 = new Message("hello", 1000 * 5L);
        Message message2 = new Message("world", 1000 * 7L);

        queue.put(message1);
        queue.put(message2);

        while (queue.size() > 0) {
            System.out.println(queue.take());
        }
    }
}

  里邊的main方法里邊聲明了兩個Message,一個延遲5秒,一個延遲7秒,時間到了之后會將接取出并且打印。輸出的結(jié)果如下,正是我們所期望的。

1587218430786:start
1587218435789:hello
1587218437793:world

  這個方法實現(xiàn)起來真的非常簡單。但是缺點也是很明顯的,就是數(shù)據(jù)在內(nèi)存里邊,數(shù)據(jù)比較容易丟失。那么我們需要采用Redis實現(xiàn)分布式的任務(wù)處理。

  2. 使用Redis的list實現(xiàn)分布式延遲隊列。

  本地需要安裝一個Redis,我自己是使用Docker構(gòu)建一個Redis,非??焖伲钜矝]多少。我們直接啟動Redis并且暴露6379端口。進入之后直接使用客戶端命令即可查看和調(diào)試數(shù)據(jù)。

docker pull redis
docker run -itd --name redisLocal -p 6379:6379 redis
docker exec -it redisLocal /bin/bash
redis-cli

  我本地采用spring-boot的方式連接redis,pom文件列一下,供大家參考。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hqs</groupId>
    <artifactId>delayQueue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>delayQueue</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  加上Redis的配置放到application.properties里邊即可實現(xiàn)Redis連接,非常的方便。

# redis
redis.host=127.0.0.1
redis.port=6379
redis.password=
redis.maxIdle=100
redis.maxTotal=300
redis.maxWait=10000
redis.testOnBorrow=true
redis.timeout=100000

  接下來實現(xiàn)一個基于Redis的list數(shù)據(jù)類型進行實現(xiàn)的一個類。我們使用RedisTemplate操作Redis,這個里邊封裝好我們所需要的Redis的一些方法,用起來非常方便。這個類允許延遲任務(wù)做多有10W個,也是避免數(shù)據(jù)量過大對Redis造成影響。如果在線上使用的時候也需要考慮延遲任務(wù)的多少。太多幾百萬幾千萬的時候可能數(shù)據(jù)量非常大,我們需要計算Redis的空間是否夠。這個代碼也是非常的簡單,一個用于存放需要延遲的消息,采用offer的方法。另外一個是啟動一個線程, 如果消息時間到了,那么就將數(shù)據(jù)lpush到Redis里邊。

package com.hqs.delayQueue.cache;

import com.hqs.delayQueue.bean.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.concurrent.BlockingQueue;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
@Slf4j
public class RedisListDelayedQueue{

    private static final int MAX_SIZE_OF_QUEUE = 100000;
    private RedisTemplate<String, String> redisTemplate;
    private String queueName;
    private BlockingQueue<Message> delayedQueue;

    public RedisListDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
        this.redisTemplate = redisTemplate;
        this.queueName = queueName;
        this.delayedQueue = delayedQueue;
        init();
    }

    public void offerMessage(Message message) {
        if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
            throw new IllegalStateException("超過隊列要求最大值,請檢查");
        }
        try {
            log.info("offerMessage:" + message);
            delayedQueue.offer(message);
        } catch (Exception e) {
            log.error("offMessage異常", e);
        }
    }

    public void init() {
        new Thread(() -> {
            while(true) {
                try {
                    Message message = delayedQueue.take();
                    redisTemplate.opsForList().leftPush(queueName, message.toString());
                } catch (InterruptedException e) {
                    log.error("取消息錯誤", e);
                }
            }
        }).start();
    }
}

  接下來我們看一下,我們寫一個測試的controller。大家看一下這個請求/redis/listDelayedQueue的代碼位置。我們也是生成了兩個消息,然后把消息放到隊列里邊,另外我們在啟動一個線程任務(wù),用于將數(shù)據(jù)從Redis的list中獲取。方法也非常簡單。

package com.hqs.delayQueue.controller;

import com.hqs.delayQueue.bean.Message;
import com.hqs.delayQueue.cache.RedisListDelayedQueue;
import com.hqs.delayQueue.cache.RedisZSetDelayedQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.Set;
import java.util.concurrent.*;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
@Slf4j
@Controller
public class DelayQueueController {

    private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors();

    //注意RedisTemplate用的String,String,后續(xù)所有用到的key和value都是String的
    @Autowired
    RedisTemplate<String, String> redisTemplate;

    private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>());

    @GetMapping("/redisTest")
    @ResponseBody
    public String redisTest() {
        redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS);
        System.out.println(redisTemplate.opsForValue().get("a"));
        return "s";
    }

    @GetMapping("/redis/listDelayedQueue")
    @ResponseBody
    public String listDelayedQueue() {

        Message message1 = new Message("hello", 1000 * 5L);
        Message message2 = new Message("world", 1000 * 7L);

        String queueName = "list_queue";

        BlockingQueue<Message> delayedQueue = new DelayQueue<>();

        RedisListDelayedQueue redisListDelayedQueue = new RedisListDelayedQueue(redisTemplate, queueName, delayedQueue);

        redisListDelayedQueue.offerMessage(message1);
        redisListDelayedQueue.offerMessage(message2);
        asyncListTask(queueName);

        return "success";
    }

    @GetMapping("/redis/zSetDelayedQueue")
    @ResponseBody
    public String zSetDelayedQueue() {

        Message message1 = new Message("hello", 1000 * 5L);
        Message message2 = new Message("world", 1000 * 7L);

        String queueName = "zset_queue";

        BlockingQueue<Message> delayedQueue = new DelayQueue<>();

        RedisZSetDelayedQueue redisZSetDelayedQueue = new RedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue);

        redisZSetDelayedQueue.offerMessage(message1);
        redisZSetDelayedQueue.offerMessage(message2);
        asyncZSetTask(queueName);

        return "success";
    }

    public void asyncListTask(String queueName) {
        taskExecPool.execute(() -> {
            for(;;) {
                String message = redisTemplate.opsForList().rightPop(queueName);
                if(message != null) {
                    log.info(message);
                }
            }
        });
    }

    public void asyncZSetTask(String queueName) {
        taskExecPool.execute(() -> {
            for(;;) {
                Long nowTimeInMs = System.currentTimeMillis();
                System.out.println("nowTimeInMs:" + nowTimeInMs);
                Set<String> messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs);
                if(messages != null && messages.size() != 0) {
                    redisTemplate.opsForZSet().removeRangeByScore(queueName, 0, nowTimeInMs);
                    for (String message : messages) {
                        log.info("asyncZSetTask:" + message + " " + nowTimeInMs);
                    }
                    log.info(redisTemplate.opsForZSet().zCard(queueName).toString());
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

}

  我就不把運行結(jié)果寫出來了,感興趣的同學(xué)自己自行試驗。當然這個方法也是從內(nèi)存中拿出數(shù)據(jù),到時間之后放到Redis里邊,還是會存在程序啟動的時候,任務(wù)進行丟失。我們繼續(xù)看另外一種方法更好的進行這個問題的處理。

3.使用Redis的zSet實現(xiàn)分布式延遲隊列。

  我們需要再寫一個ZSet的隊列處理。下邊的offerMessage主要是把消息直接放入緩存中。采用Redis的ZSET的zadd方法。zadd(key, value, score) 即將key=value的數(shù)據(jù)賦予一個score, 放入緩存中。score就是計算出來延遲的毫秒數(shù)。

package com.hqs.delayQueue.cache;

import com.hqs.delayQueue.bean.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.concurrent.BlockingQueue;

/**
 * @author huangqingshi
 * @Date 2020-04-18
 */
@Slf4j
public class RedisZSetDelayedQueue {

    private static final int MAX_SIZE_OF_QUEUE = 100000;
    private RedisTemplate<String, String> redisTemplate;
    private String queueName;
    private BlockingQueue<Message> delayedQueue;

    public RedisZSetDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
        this.redisTemplate = redisTemplate;
        this.queueName = queueName;
        this.delayedQueue = delayedQueue;
    }

    public void offerMessage(Message message) {
        if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
            throw new IllegalStateException("超過隊列要求最大值,請檢查");
        }
        long delayTime = message.getFireTime() - System.currentTimeMillis();
        log.info("zset offerMessage" + message + delayTime);
        redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime());
    }

}

  上邊的Controller方法已經(jīng)寫好了測試的方法。/redis/zSetDelayedQueue,里邊主要使用ZSet的zRangeByScore(key, min, max)。主要是從score從0,當前時間的毫秒數(shù)獲取。取出數(shù)據(jù)后再采用removeRangeByScore,將數(shù)據(jù)刪除。這樣數(shù)據(jù)可以直接寫到Redis里邊,然后取出數(shù)據(jù)后直接處理。這種方法比前邊的方法稍微好一些,但是實際上還存在一些問題,因為依賴Redis,如果Redis內(nèi)存不足或者連不上的時候,系統(tǒng)將變得不可用。

4. 總結(jié)一下,另外還有哪些可以延遲隊列。

  上面的方法其實還是存在問題的,比如系統(tǒng)重啟的時候還是會造成任務(wù)的丟失。所以我們在生產(chǎn)上使用的時候,我們還需要將任務(wù)保存起來,比如放到數(shù)據(jù)庫和文件存儲系統(tǒng)將數(shù)據(jù)存儲起來,這樣做到double-check,雙重檢查,最終達到任務(wù)的99.999%能夠處理。

  其實還有很多東西可以實現(xiàn)延遲隊列。

  1) RabbitMQ就可以實現(xiàn)此功能。這個消息隊列可以把數(shù)據(jù)保存起來并且進行處理。

  2)Kafka也可以實現(xiàn)這個功能。

  3)Netty的HashedWheelTimer也可以實現(xiàn)這個功能。

最后放上我的代碼: https://github.com/stonehqs/delayQueue

到此這篇關(guān)于Redis延遲隊列和分布式延遲隊列的簡答實現(xiàn)的文章就介紹到這了,更多相關(guān)Redis延遲隊列和分布式延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis底層數(shù)據(jù)結(jié)構(gòu)之dict、ziplist、quicklist詳解

    Redis底層數(shù)據(jù)結(jié)構(gòu)之dict、ziplist、quicklist詳解

    本文給大家詳細介紹了Redis的底層數(shù)據(jù)結(jié)構(gòu):dict、ziplist、quicklist的相關(guān)知識,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2021-09-09
  • Redis服務(wù)之高可用組件sentinel詳解

    Redis服務(wù)之高可用組件sentinel詳解

    這篇文章主要介紹了Redis服務(wù)之高可用組件sentinel,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-08-08
  • Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法

    Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法

    這篇文章主要介紹了Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn),本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-02-02
  • Redis底層類型之json命令使用

    Redis底層類型之json命令使用

    這篇文章主要為大家介紹了Redis底層類型之json命令使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-09-09
  • Redis的主從同步解析

    Redis的主從同步解析

    這篇文章主要介紹了Redis的主從同步解析,見識淺薄,僅供參考。
    2017-10-10
  • Redis基本數(shù)據(jù)類型Zset有序集合常用操作

    Redis基本數(shù)據(jù)類型Zset有序集合常用操作

    這篇文章主要為大家介紹了redis基本數(shù)據(jù)類型Zset有序集合常用操作,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-05-05
  • 淺談Redis緩存有哪些淘汰策略

    淺談Redis緩存有哪些淘汰策略

    redis用做緩存是一種非常常見的手段,然而由于內(nèi)存大小的限制,會導(dǎo)致redis在內(nèi)存空間滿了以后需要處理繼續(xù)存入的數(shù)據(jù),所以就需要淘汰策略,本文就詳細的介紹一下
    2021-08-08
  • Redis主從復(fù)制的原理分析

    Redis主從復(fù)制的原理分析

    Redis主從復(fù)制通過將數(shù)據(jù)鏡像到多個從節(jié)點,實現(xiàn)高可用性和擴展性,主從復(fù)制包括初次全量同步和增量同步兩個階段,為優(yōu)化復(fù)制性能,可以采用AOF持久化、調(diào)整復(fù)制超時時間、優(yōu)化網(wǎng)絡(luò)帶寬等措施,故障轉(zhuǎn)移機制依賴于Sentinel或Cluster組件
    2025-01-01
  • Redis集群利用Redisson實現(xiàn)分布式鎖方式

    Redis集群利用Redisson實現(xiàn)分布式鎖方式

    這篇文章主要介紹了Redis集群利用Redisson實現(xiàn)分布式鎖方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Redis SETNX的實現(xiàn)示例

    Redis SETNX的實現(xiàn)示例

    SETNX是Redis提供的原子操作,用于在指定鍵不存在時設(shè)置鍵值,并返回操作結(jié)果,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-12-12

最新評論