Spring?Boot集成Redisson實現(xiàn)延遲隊列
項目場景:
在電商、支付等領域,往往會有這樣的場景,用戶下單后放棄支付了,那這筆訂單會在指定的時間段后進行關閉操作,細心的你一定發(fā)現(xiàn)了像某寶、某東都有這樣的邏輯,而且時間很準確,誤差在1s內(nèi);那他們是怎么實現(xiàn)的呢?
一般實現(xiàn)的方法有幾種:使用 redisson、rocketmq、rabbitmq等消息隊列的延時投遞功能。
解決方案:
一般項目集成redis的比較多,所以我這篇文章就說下redisson延遲隊列,如果使用rocketmq或rabbitmq需要額外集成中間件,比較麻煩一點。
1.集成redisson
maven依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.21.1</version> </dependency>
yml配置,單節(jié)點配置可以兼容redis的配置方式
# redis配置
spring:
redis:
database: 0
host: 127.0.0.1
password: redis@pass
port: 6001更詳細的配置參考:Spring Boot整合Redisson的兩種方式
2.配置多線程
因為延遲隊列可能會多個任務同時執(zhí)行,所以需要多線程處理。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
* 異步任務自定義線程池
*/
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(50);
//配置最大線程數(shù)
executor.setMaxPoolSize(500);
//配置隊列大小
executor.setQueueCapacity(300);
//允許線程空閑時間
executor.setKeepAliveSeconds(60);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("taskExecutor-");
// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//調(diào)用shutdown()方法時等待所有的任務完成后再關閉
executor.setWaitForTasksToCompleteOnShutdown(true);
//等待所有任務完成后的最大等待時間
executor.setAwaitTerminationSeconds(60);
return executor;
}
}3.具體業(yè)務
比如消息通知、關閉訂單等 ,這里加上了@Async注解,可以異步執(zhí)行
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class AsyncService {
@Async
public void executeQueue(Object value) {
System.out.println();
System.out.println("當前線程:"+Thread.currentThread().getName());
System.out.println("執(zhí)行任務:"+value);
//打印時間方便查看
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("執(zhí)行任務的時間:"+sdf.format(new Date()));
//自己的業(yè)務邏輯,可以根據(jù)id發(fā)送通知消息等
//......
}
}4.延遲隊列(關鍵代碼)
這里包括添加延遲隊列,和消費延遲隊列,@PostConstruct注解的意思是服務啟動加載一次,參考
Spring Boot項目啟動時執(zhí)行指定的方法
Spring Boot中多個PostConstruct注解執(zhí)行順序控制
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Service
public class TestService {
@Resource
private AsyncService asyncService;
@Resource
private ThreadPoolTaskExecutor executor;
@Autowired
private RedissonClient redissonClient;
/**
* 添加延遲任務
*/
public void addQueue() {
//獲取延遲隊列
RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("delayedQueue");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
for (int i = 1; i <= 10; i++) {
long delayTime = 5+i; //延遲時間(秒)
// long delayTime = 5; //這里時間統(tǒng)一,可以測試并發(fā)執(zhí)行
delayedQueue.offer("延遲任務"+i, delayTime, TimeUnit.SECONDS);
}
//打印時間方便查看
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("添加任務的時間:"+sdf.format(new Date()));
}
/**
* 服務啟動時加載,開始消費延遲隊列
*/
@PostConstruct
public void consumer() {
System.out.println("服務啟動時加載>>>>>>");
//獲取延遲隊列
RBlockingQueue<Object> delayedQueue = redissonClient.getBlockingQueue("delayedQueue");
//啟用一個線程來消費這個延遲隊列
executor.execute(() ->{
while (true){
try {
// System.out.println("while中的線程:"+Thread.currentThread().getName());
//獲取延遲隊列中的任務
Object value = delayedQueue.poll();
if(value == null){
//如果沒有任務就休眠1秒,休眠時間根據(jù)業(yè)務自己定義
Thread.sleep(1000); //這里休眠時間越短,誤差就越小
continue;
}
//異步處理延遲隊列中的消息
asyncService.executeQueue(value);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}5.測試接口
import com.test.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private TestService testService;
/*
* 添加延遲任務
*/
@GetMapping(value = "/addQueue")
public String addQueue() {
testService.addQueue();
return "success";
}
}6.測試結果

總結:
- Redisson的的RDelayedQueue是基于Redis實現(xiàn)的,而Redis本身并不保證數(shù)據(jù)的持久性。如果Redis服務器宕機,那么所有在RDelayedQueue中的數(shù)據(jù)都會丟失。因此,我們需要在應用層面進行持久化設計,例如定期將RDelayedQueue中的數(shù)據(jù)持久化到數(shù)據(jù)庫。
- 在設計延遲任務時,我們應該根據(jù)實際需求來合理設置延遲時間,避免設置過長的延遲時間導致內(nèi)存占用過高。
到此這篇關于Spring Boot集成Redisson實現(xiàn)延遲隊列的文章就介紹到這了,更多相關SpringBoot Redisson延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java并發(fā)包之CopyOnWriteArrayList類的深入講解
這篇文章主要給大家介紹了關于Java并發(fā)包之CopyOnWriteArrayList類的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-12-12
如何在Java中創(chuàng)建線程通信的四種方式你知道嗎
開發(fā)中不免會遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場景?;蛘呤蔷€程 A 在執(zhí)行到某個條件通知線程 B 執(zhí)行某個操作。下面我們來一起學習如何解決吧2021-09-09
解決springboot 無法配置多個靜態(tài)路徑的問題
這篇文章主要介紹了解決springboot 無法配置多個靜態(tài)路徑的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
初探Spring Cloud Gateway實戰(zhàn)
這篇文章主要介紹了創(chuàng)建網(wǎng)關項目(Spring Cloud Gateway)過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2021-08-08

