Java使用延時隊列搞定超時訂單處理的場景
作者:趕路人兒
這篇文章主要介紹了Java使用延時隊列搞定超時訂單處理,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
1、延時隊列使用場景:
那么什么時候需要用延時隊列呢?常見的延時任務場景 舉栗子:
- 訂單在30分鐘之內(nèi)未支付則自動取消。
- 重試機制實現(xiàn),把調(diào)用失敗的接口放入一個固定延時的隊列,到期后再重試。
- 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。
- 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運營人員。
- 預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議。
- 關(guān)閉空閑連接,服務器中,有很多客戶端的連接,空閑一段時間之后需要關(guān)閉之。
- 清理過期數(shù)據(jù)業(yè)務。比如緩存中的對象,超過了空閑時間,需要從緩存中移出。
解決方案也非常多:
- 定期輪詢(數(shù)據(jù)庫等)
- JDK DelayQueue
- JDK Timer
- ScheduledExecutorService 周期性線程池
- 時間輪(kafka)
- 時間輪(Netty的HashedWheelTimer)
- Redis有序集合(zset)
- zookeeper之curator
- RabbitMQ
- Quartz,xxljob等定時任務框架
- Koala(考拉)
- JCronTab(仿crontab的java調(diào)度器)
SchedulerX(阿里)
對于單機服務優(yōu)選DelayQueue,對于分布式環(huán)境,可以使用mq、zk、redis之類的。接下來,介紹DelayQueue的使用。
一句話介紹:DelayQueue = BlockingQueue + PriorityQueue + Delayed
2、示例:
實戰(zhàn)以訂單下單后三十分鐘內(nèi)未支付則自動取消 為業(yè)務場景,該場景的代碼邏輯分析如下:
- 下單后將訂單直接放入未支付的延時隊列中
- 如果超時未支付,則從隊列中取出,進行修改為取消狀態(tài)的訂單
- 如果支付了,則不去進行取消,或者取消的時候做個狀態(tài)篩選,即可避免更新
- 或者支付完成后,做個主動出隊
還有就是用戶主動取消訂單,也做個主動出隊
1)先來寫個通用的??Delayed?? :
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Setter
@Getter
public class ItemDelayed<T> implements Delayed {
/**默認延遲30分鐘*/
private final static long DELAY = 30 * 60 * 1000L;
/**數(shù)據(jù)id*/
private Long dataId;
/**開始時間*/
private long startTime;
/**到期時間*/
private long expire;
/**創(chuàng)建時間*/
private Date now;
/**泛型data*/
private T data;
public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + (secondsDelay * 1000);
this.now = new Date();
}
public ItemDelayed(Long dataId, long startTime) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + DELAY;
this.now = new Date();
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}2)再寫個通用的接口,用于規(guī)范和方便統(tǒng)一實現(xiàn) 這樣任何類型的訂單都可以實現(xiàn)這個接口 進行延時任務的處理:
public interface DelayOrder<T> {
/**
* 添加延遲對象到延時隊列
*
* @param itemDelayed 延遲對象
* @return boolean
*/
boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);
/**
* 根據(jù)對象添加到指定延時隊列
*
* @param data 數(shù)據(jù)對象
* @return boolean
*/
boolean addToDelayQueue(T data);
/**
* 移除指定的延遲對象從延時隊列中
*
* @param data
*/
void removeToOrderDelayQueue(T data);
}具體業(yè)務邏輯實現(xiàn):
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {
@Autowired
private OrderService orderService;
@Autowired
private ExecutorService delayOrderExecutor;
private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();
/**
* 初始化時加載數(shù)據(jù)庫中需處理超時的訂單
* 系統(tǒng)啟動:掃描數(shù)據(jù)庫中未支付(要在更新時:加上已支付就不用更新了),未過期的的訂單
*/
@PostConstruct
public void init() {
log.info("系統(tǒng)啟動:掃描數(shù)據(jù)庫中未支付,未過期的的訂單");
List<Order> orderList = orderService.selectFutureOverTimeOrder();
for (Order order : orderList) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
this.addToOrderDelayQueue(orderDelayed);
}
log.info("系統(tǒng)啟動:掃描數(shù)據(jù)庫中未支付的訂單,總共掃描了" + orderList.size() + "個訂單,推入檢查隊列,準備到期檢查...");
/*啟動一個線程,去取延遲訂單*/
delayOrderExecutor.execute(() -> {
log.info("啟動處理的訂單線程:" + Thread.currentThread().getName());
ItemDelayed<Order> orderDelayed;
while (true) {
try {
orderDelayed = DELAY_QUEUE.take();
//處理超時訂單
orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
} catch (Exception e) {
log.error("執(zhí)行自營超時訂單的_延遲隊列_異常:" + e);
}
}
});
}
/**
* 加入延遲消息隊列
**/
@Override
public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
return DELAY_QUEUE.add(orderDelayed);
}
/**
* 加入延遲消息隊列
**/
@Override
public boolean addToDelayQueue(Order order) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
return DELAY_QUEUE.add(orderDelayed);
}
/**
* 從延遲隊列中移除 主動取消就主動從隊列中取出
**/
@Override
public void removeToOrderDelayQueue(Order order) {
if (order == null) {
return;
}
for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayed<Order> queue = iterator.next();
if (queue.getDataId().equals(order.getId())) {
DELAY_QUEUE.remove(queue);
}
}
}
}分析:
- delayOrderExecutor是注入的一個專門處理出隊的一個線程
- @PostConstruct是啥呢,是在容器啟動后只進行一次初始化動作的一個注解,相當實用
- 啟動后呢,我們?nèi)?shù)據(jù)庫掃描一遍,防止有漏網(wǎng)之魚,因為單機版嗎,隊列的數(shù)據(jù)是在內(nèi)存中的,重啟后肯定原先的數(shù)據(jù)會丟失,所以為保證服務質(zhì)量,我們可能會錄音.....所以為保證重啟后數(shù)據(jù)的恢復,我們需要重新掃描數(shù)據(jù)庫把未支付的數(shù)據(jù)重新裝載到內(nèi)存的隊列中
- 接下來就是用這個線程去一直不停的訪問隊列的take()方法,當隊列無數(shù)據(jù)就一直阻塞,或者數(shù)據(jù)沒到期繼續(xù)阻塞著,直到到期出隊,然后獲取訂單的信息,去處理訂單的更新操作
到此這篇關(guān)于Java使用延時隊列搞定超時訂單處理的文章就介紹到這了,更多相關(guān)java延時隊列超時訂單處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!