一文帶你深入了解Java中延時(shí)任務(wù)的實(shí)現(xiàn)
概述
延時(shí)任務(wù)相信大家都不陌生,在現(xiàn)實(shí)的業(yè)務(wù)中應(yīng)用場(chǎng)景可以說(shuō)是比比皆是。例如訂單下單15分鐘未支付直接取消,外賣超時(shí)自動(dòng)賠付等等。這些情況下,我們?cè)撛趺丛O(shè)計(jì)我們的服務(wù)的實(shí)現(xiàn)呢?
笨一點(diǎn)的方法自然是定時(shí)任務(wù)去數(shù)據(jù)庫(kù)進(jìn)行輪詢。但是當(dāng)業(yè)務(wù)量較大,事件處理比較費(fèi)時(shí)的時(shí)候,我們的系統(tǒng)和數(shù)據(jù)庫(kù)往往會(huì)面臨巨大的壓力,如果采用這種方式或許會(huì)導(dǎo)致數(shù)據(jù)庫(kù)和系統(tǒng)的崩潰。那么有什么好辦法嗎?今天我來(lái)為大家介紹幾種實(shí)現(xiàn)延時(shí)任務(wù)的辦法。
JAVA DelayQueue
你沒(méi)看錯(cuò),java內(nèi)部有內(nèi)置延時(shí)隊(duì)列,位于java concurrent包內(nèi)。
DelayQueue
是一個(gè)jdk中自帶的延時(shí)隊(duì)列實(shí)現(xiàn),他的實(shí)現(xiàn)依賴于可重入鎖ReentrantLock
以及條件鎖Condition
和優(yōu)先隊(duì)列PriorityQueue
。而且本質(zhì)上他也是一個(gè)阻塞隊(duì)列。那么他是如何實(shí)現(xiàn)延時(shí)效果的呢。
DelayQueue的實(shí)現(xiàn)原理
首先DelayQueue
隊(duì)列中的元素必須繼承一個(gè)接口叫做Delayed
,我們找到這個(gè)類
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
發(fā)現(xiàn)這個(gè)類內(nèi)部定義了一個(gè)返回值為long
的方法getDelay
,這個(gè)方法用來(lái)定義隊(duì)列中的元素的過(guò)期時(shí)間,所有需要放在隊(duì)列中的元素,必須實(shí)現(xiàn)這個(gè)方法。
然后我們來(lái)看看延遲隊(duì)列的隊(duì)列是如何操作的,我們就拿最典型的offer
和take
來(lái)看:
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
offer
操作平平無(wú)奇,甚至直接調(diào)用到了優(yōu)先隊(duì)列的offer來(lái)將隊(duì)列根據(jù)延時(shí)進(jìn)行排序,只不過(guò)加了個(gè)鎖,做了些數(shù)據(jù)的調(diào)整,沒(méi)有什么深入的地方,但是take
的實(shí)現(xiàn)看上去就很復(fù)雜了。(注意,Dalaye
d繼承了Comparable
方法,所以是可以直接用優(yōu)先隊(duì)列來(lái)排序的,只要你自己實(shí)現(xiàn)了compareTo
方法)我嘗試加了些注釋讓各位看得更明白些:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 自選操作 for (;;) { // 獲取隊(duì)列第一個(gè)元素,如果隊(duì)列為空 // 阻塞住直到有新元素加入隊(duì)列,offer等方法調(diào)用signal喚醒線程 E first = q.peek(); if (first == null) available.await(); else { // 如果隊(duì)列中有元素 long delay = first.getDelay(NANOSECONDS); // 判斷延時(shí)時(shí)間,如果到時(shí)間了,直接取出數(shù)據(jù)并return if (delay <= 0) return q.poll(); first = null; // 如果leader為空則阻塞 if (leader != null) available.await(); else { // 獲取當(dāng)前線程 Thread thisThread = Thread.currentThread(); // 設(shè)置leader為當(dāng)前線程 leader = thisThread; try { // 阻塞延時(shí)時(shí)間 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
我們可以看到take
的實(shí)現(xiàn)依靠了無(wú)限自旋,直到第一個(gè)隊(duì)列元素過(guò)了超時(shí)時(shí)間后才會(huì)返回,否則等待他的只有被阻塞。
DelayQueue實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)缺點(diǎn)
看了源碼后,我們應(yīng)該對(duì)DelayQueue
的實(shí)現(xiàn)有了一個(gè)大致的了解,也對(duì)他的優(yōu)缺點(diǎn)有了一定的理解。他的優(yōu)點(diǎn)很明顯:
- java原生支持,不需要引入第三方工具
- 線程安全,即插即用使用方便
但是他的缺點(diǎn)也是很明顯的:
- 不支持分布式,并且數(shù)據(jù)放在內(nèi)存中,沒(méi)有持久化的支持,服務(wù)宕機(jī)會(huì)丟失數(shù)據(jù)
- 插入時(shí)使用的是優(yōu)先隊(duì)列的排序,時(shí)間復(fù)雜度較高,并且對(duì)于隊(duì)列中的任務(wù)不能很好的管理
所以有沒(méi)有更好的延時(shí)隊(duì)列的實(shí)現(xiàn)呢,我們繼續(xù)看下去~
時(shí)間輪算法
時(shí)間輪算法是一個(gè)被設(shè)計(jì)出來(lái)處理延時(shí)任務(wù)的算法,現(xiàn)實(shí)中的應(yīng)用可以在kafka
以及netty
等項(xiàng)目中找到類似的實(shí)現(xiàn)。
時(shí)間輪的具體實(shí)現(xiàn)
所謂時(shí)間輪,顧名思義,他是一個(gè)類似于時(shí)鐘的結(jié)構(gòu),即他的主結(jié)構(gòu)是一個(gè)環(huán)形數(shù)組,如圖:
環(huán)形數(shù)組中存放的是一個(gè)一個(gè)的鏈表,鏈表中存放著需要執(zhí)行的任務(wù),我們?cè)O(shè)定好數(shù)組中執(zhí)行的間隔,假設(shè)我們的環(huán)形數(shù)組的長(zhǎng)度是60,每個(gè)數(shù)組的執(zhí)行間隔為1s,那么我們會(huì)在每過(guò)1s就會(huì)執(zhí)行數(shù)組下一個(gè)元素中的鏈表中的元素。如果只是這樣,那么我們將無(wú)法處理60秒之外的延時(shí)任務(wù),這顯然不合適,所以我們會(huì)在每個(gè)任務(wù)中加上一個(gè)參數(shù)圈數(shù),來(lái)表明任務(wù)會(huì)在幾圈后執(zhí)行。假如我們有一個(gè)任務(wù)是在150s后執(zhí)行,那么他應(yīng)該在30s的位置,同時(shí)圈數(shù)應(yīng)該為2。我們每次執(zhí)行一個(gè)鏈表中的任務(wù)的時(shí)候會(huì)把當(dāng)圈需要執(zhí)行的任務(wù)取出執(zhí)行,然后把他從鏈表中刪除,如果任務(wù)不是當(dāng)圈執(zhí)行,則修改他的圈數(shù),將圈數(shù)減1,于是一個(gè)簡(jiǎn)單的時(shí)間輪出爐了。
那么這樣的時(shí)間輪有什么優(yōu)缺點(diǎn)呢?
先來(lái)說(shuō)優(yōu)點(diǎn)吧:
- 相比
DelayQueue
來(lái)說(shuō),時(shí)間輪的插入更加的高效,時(shí)間復(fù)雜度為O(1) - 實(shí)現(xiàn)簡(jiǎn)單清晰,任務(wù)調(diào)度更加方便合理
當(dāng)然他的缺點(diǎn)也不少:
- 他和
DelayQueue
一樣不支持分布式,并且數(shù)據(jù)放在內(nèi)存中,沒(méi)有持久化的支持,服務(wù)宕機(jī)會(huì)丟失數(shù)據(jù) - 數(shù)組間的間隔設(shè)置會(huì)影響任務(wù)的精度
- 由于不同圈數(shù)的任務(wù)會(huì)在同一個(gè)鏈表中,執(zhí)行到每個(gè)數(shù)組元素時(shí)需要遍歷所有的鏈表數(shù)據(jù),效率會(huì)很低
進(jìn)階優(yōu)化版時(shí)間輪算法
剛才提到了一些時(shí)間輪算法的缺點(diǎn),那么是不是有一些方法來(lái)進(jìn)行下優(yōu)化?這里我來(lái)介紹一下時(shí)間輪的優(yōu)化版本。
之前我們提到不同圈數(shù)的任務(wù)會(huì)在同一個(gè)鏈表中被重復(fù)遍歷影響效率,這種情況下我們可以進(jìn)行如下優(yōu)化:將時(shí)間輪進(jìn)行分層
我們可以看到圖中,我們采用了多層級(jí)的設(shè)計(jì),上圖中分了三層,每層都是60格,第一個(gè)輪盤中的間隔為1小時(shí),我們的數(shù)據(jù)每一次都是插入到這個(gè)輪盤中,每當(dāng)這個(gè)輪盤經(jīng)過(guò)一個(gè)小時(shí)后來(lái)到下一個(gè)刻度,就會(huì)取出其中的所有元素,按照延遲時(shí)間放入到第二個(gè)象征著分鐘的輪盤中,以此類推。
這樣的實(shí)現(xiàn)好處可以說(shuō)是顯而易見的:
- 首先避免了當(dāng)時(shí)間跨度較大時(shí)空間的浪費(fèi)
- 每一次到達(dá)刻度的時(shí)候我們不用再像以前那樣遍歷鏈表取出需要的數(shù)據(jù),而是可以一次性全部拿出來(lái),大大節(jié)約了操作的時(shí)間
時(shí)間輪算法的應(yīng)用
時(shí)間輪算法可能在之前大家沒(méi)有聽說(shuō)過(guò),但是他在各個(gè)地方都有著不小的作用。linux的定時(shí)器的實(shí)現(xiàn)中就有時(shí)間輪的身影,同樣如果你是一個(gè)喜好看源碼的讀者,你也可能會(huì)在kafka
以及netty
中找到他的實(shí)現(xiàn)。
kafka
kafka中應(yīng)用了時(shí)間輪算法,他的實(shí)現(xiàn)和之前提到的進(jìn)階版時(shí)間輪沒(méi)有太大的區(qū)別,只有在一點(diǎn)上:kafka
內(nèi)部實(shí)現(xiàn)的時(shí)間輪應(yīng)用到了DelayQueue
。
@nonthreadsafe private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { private[this] val interval = tickMs * wheelSize private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } private[this] var currentTime = startMs - (startMs % tickMs) @volatile private[this] var overflowWheel: TimingWheel = null private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) } } } def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { false } else if (expiration < currentTime + tickMs) { false } else if (expiration < currentTime + interval) { val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) if (bucket.setExpiration(virtualId * tickMs)) { queue.offer(bucket) } true } else { if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } } def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { currentTime = timeMs - (timeMs % tickMs) if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } } }
上面是kafka內(nèi)部的實(shí)現(xiàn)(使用的語(yǔ)言是scala),我們可以看到實(shí)現(xiàn)非常的簡(jiǎn)潔,并且使用到了DelayQueue
。我們剛才已經(jīng)討論過(guò)了DelayQueue
的優(yōu)缺點(diǎn),查看源碼后我們已經(jīng)可以有一個(gè)大致的結(jié)論了:DelayQueue
在kafka的時(shí)間輪中的作用是負(fù)責(zé)推進(jìn)任務(wù)的,為的就是防止在時(shí)間輪中由于任務(wù)比較稀疏而造成的"空推進(jìn)"。DelayQueue
的觸發(fā)機(jī)制可以很好的避免這一點(diǎn),同時(shí)由于DelayQueue
的插入效率較低,所以僅用于底層的推進(jìn),任務(wù)的插入由時(shí)間輪來(lái)操作,兩者配置,可以實(shí)現(xiàn)效率和資源的平衡。
netty
netty
的內(nèi)部也有時(shí)間輪的實(shí)現(xiàn)HashedWheelTimer
HashedWheelTimer
的實(shí)現(xiàn)要比kafka內(nèi)部的實(shí)現(xiàn)復(fù)雜許多,和kafka不同的是,它的內(nèi)部推進(jìn)不是依靠的DelayQueue
而是自己實(shí)現(xiàn)了一套,源碼太長(zhǎng),有興趣的讀者可以自己去看一下。
小結(jié)
時(shí)間輪說(shuō)了這么多,我們可以看到他的效率是很出眾的,但是還是有這么一個(gè)問(wèn)題:他不支持分布式。當(dāng)我們的業(yè)務(wù)很復(fù)雜,需要分布式的時(shí)候,時(shí)間輪顯得力不從心,那么這個(gè)時(shí)候有什么好一點(diǎn)的延時(shí)隊(duì)列的選擇呢?我們或許可以嘗試使用第三方的工具
redis延時(shí)隊(duì)列
其實(shí)啊說(shuō)起延時(shí),我們?nèi)绻S?code>redis的話,就會(huì)想起redis
是存在過(guò)期機(jī)制的,那么我們是否可以利用這個(gè)機(jī)制來(lái)實(shí)現(xiàn)一個(gè)延時(shí)隊(duì)列呢?
redis
自帶key的過(guò)期機(jī)制,而且可以設(shè)置過(guò)期后的回調(diào)方法。基于此特性,我們可以非常容易就完成一個(gè)延時(shí)隊(duì)列,任務(wù)進(jìn)來(lái)時(shí),設(shè)定定時(shí)時(shí)間,并且配置好過(guò)期回調(diào)方法即可。
除了使用redis
的過(guò)期機(jī)制之外,我們也可以利用它自帶的zset
來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列。zset
支持高性能的排序,因此我們?nèi)蝿?wù)進(jìn)來(lái)時(shí)可以將時(shí)間戳作為排序的依據(jù),以此將任務(wù)的執(zhí)行先后進(jìn)行有序的排列,這樣也能實(shí)現(xiàn)延時(shí)隊(duì)列。
zset
實(shí)現(xiàn)延時(shí)隊(duì)列的好處:
- 支持高性能排序
redis
本身的高可用和高性能以及持久性
mq延時(shí)隊(duì)列
rocketmq延時(shí)消息
rocketmq
天然支持延時(shí)消息,他的延時(shí)消息分為18個(gè)等級(jí),每個(gè)等級(jí)對(duì)應(yīng)不同的延時(shí)時(shí)間。
那么他的原理是怎樣的呢?
rocketmq
的broker
收到消息后會(huì)將消息寫入commitlog
,并且判斷這個(gè)消息是否是延時(shí)消息(即delay屬性是否大于0),之后如果判斷確實(shí)是延時(shí)消息,那么他不會(huì)馬上寫入,而是通過(guò)轉(zhuǎn)發(fā)的方式將消息放入對(duì)應(yīng)的延時(shí)topic
(18個(gè)延時(shí)級(jí)別對(duì)應(yīng)18個(gè)topic
)
rocketmq
會(huì)有一個(gè)定時(shí)任務(wù)進(jìn)行輪詢,如果任務(wù)的延遲時(shí)間已經(jīng)到了就發(fā)往指定的topic
。
這個(gè)設(shè)計(jì)比較的簡(jiǎn)單粗暴,但是缺點(diǎn)也十分明顯:
- 延時(shí)是固定的,如果想要的延遲超出18個(gè)級(jí)別就沒(méi)辦法實(shí)現(xiàn)
- 無(wú)法實(shí)現(xiàn)精準(zhǔn)延時(shí),隊(duì)列的堆積等等情況也會(huì)導(dǎo)致執(zhí)行產(chǎn)生誤差
rocketmq的精準(zhǔn)延時(shí)消息
rocketmq
本身是不支持的精確延遲的,他的商業(yè)版本ons
倒是支持。不過(guò)rocketmq
的社區(qū)中有相應(yīng)的解決方案。方案是借助于時(shí)間輪算法來(lái)實(shí)現(xiàn)的,感興趣的朋友可以自行去社區(qū)查看。(社區(qū)中的一些未被合并的pr是不錯(cuò)的實(shí)現(xiàn)參考)
總結(jié)
延時(shí)隊(duì)列的實(shí)現(xiàn)千千萬(wàn),但是如果要在生產(chǎn)中大規(guī)模使用,那么大部分情況下其實(shí)都避不開時(shí)間輪算法。改進(jìn)過(guò)的時(shí)間輪算法可以做到精準(zhǔn)延時(shí),持久化,高性能,高可用性,可謂是完美。但是話又說(shuō)回來(lái),其他的延時(shí)方式就無(wú)用了嗎?其實(shí)不是的,所有的方式都是需要匹配自己的使用場(chǎng)景。如果你是極少量數(shù)據(jù)的輪詢,那么定時(shí)輪詢數(shù)據(jù)庫(kù)或許才是最佳的解決方案,而不是無(wú)腦的引入復(fù)雜的延時(shí)隊(duì)列。如果是單機(jī)的任務(wù),那么jdk的延時(shí)隊(duì)列也是不錯(cuò)的選擇。
本文介紹的這些延時(shí)隊(duì)列只是為了向大家展示他們的原理和優(yōu)缺點(diǎn),具體的使用還需要結(jié)合自己業(yè)務(wù)的場(chǎng)景。
以上就是一文帶你深入了解Java中延時(shí)任務(wù)的實(shí)現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于Java延時(shí)任務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java?延時(shí)隊(duì)列及簡(jiǎn)單使用方式詳解
- Java延時(shí)的3種實(shí)現(xiàn)方法舉例
- 盤點(diǎn)Java中延時(shí)任務(wù)的多種實(shí)現(xiàn)方式
- Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的場(chǎng)景
- Java處理延時(shí)任務(wù)的常用幾種解決方案
- 詳解Java中的延時(shí)隊(duì)列 DelayQueue
- 一口氣說(shuō)出Java 6種延時(shí)隊(duì)列的實(shí)現(xiàn)方法(面試官也得服)
- Java延時(shí)執(zhí)行的三種實(shí)現(xiàn)方式
相關(guān)文章
Java實(shí)戰(zhàn)項(xiàng)目 健身管理系統(tǒng)
本文是一個(gè)Java語(yǔ)言編寫的實(shí)戰(zhàn)項(xiàng)目,是一個(gè)健身管理系統(tǒng),主要用到了ssm+springboot等技術(shù),技術(shù)含量筆記高,感興趣的童鞋跟著小編往下看吧2021-09-09Java實(shí)現(xiàn)LeetCode(組合總和)
這篇文章主要介紹了Java實(shí)現(xiàn)LeetCode(組合總數(shù)),本文通過(guò)使用java實(shí)現(xiàn)leetcode的組合總數(shù)題目和實(shí)現(xiàn)思路分析,需要的朋友可以參考下2021-06-06java servlet手機(jī)app訪問(wèn)接口(三)高德地圖云存儲(chǔ)及檢索
這篇文章主要為大家詳細(xì)介紹了java servlet手機(jī)app訪問(wèn)接口(三),高德地圖云存儲(chǔ)及檢索,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-12-12Java 實(shí)戰(zhàn)范例之精美網(wǎng)上音樂(lè)平臺(tái)的實(shí)現(xiàn)
讀萬(wàn)卷書不如行萬(wàn)里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+vue+Springboot+ssm+mysql+maven+redis實(shí)現(xiàn)一個(gè)前后端分離的精美網(wǎng)上音樂(lè)平臺(tái),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11SpringBoot使用Feign調(diào)用其他服務(wù)接口
這篇文章主要介紹了SpringBoot使用Feign調(diào)用其他服務(wù)接口,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03