Java利用Netty時(shí)間輪實(shí)現(xiàn)延時(shí)任務(wù)
一、時(shí)間輪算法簡(jiǎn)介
為了大家能夠理解下文中的代碼,我們先來簡(jiǎn)單了解一下netty時(shí)間輪算法的核心原理
時(shí)間輪算法名副其實(shí),時(shí)間輪就是一個(gè)環(huán)形的數(shù)據(jù)結(jié)構(gòu),類似于表盤,將時(shí)間輪分成多個(gè)bucket(比如:0-8)。假設(shè)每個(gè)時(shí)間輪輪片的分隔時(shí)間段tickDuration=1s(即:指針經(jīng)過每個(gè)格子花費(fèi)時(shí)間是 1 s),當(dāng)前的時(shí)間bucket=3,那么在18秒后需要被執(zhí)行的任務(wù)需要落到((3+18)%8=5取余運(yùn)算)的5號(hào)bucket上。假如有多個(gè)需要在該時(shí)間段內(nèi)執(zhí)行的任務(wù),就會(huì)組成一個(gè)雙向鏈表。另外針對(duì)時(shí)間輪我們要有下面的幾個(gè)認(rèn)知:
時(shí)間輪指針是一個(gè)Worker線程,在時(shí)間輪整點(diǎn)的時(shí)候執(zhí)行雙向鏈表中的任務(wù)。
時(shí)間輪算法的并不是精準(zhǔn)的延時(shí),它的執(zhí)行精度取決于每個(gè)時(shí)間輪輪片的分隔時(shí)間段tickDuration
Worker線程是單線程,一個(gè)bucket、一個(gè)bucket的順序處理任務(wù)。「所以我們的延時(shí)任務(wù)一定要做成異步任務(wù),否則會(huì)影響時(shí)間輪后續(xù)任務(wù)的執(zhí)行時(shí)間?!?/strong>
二、時(shí)間輪hello-world
實(shí)現(xiàn)一個(gè)延時(shí)任務(wù)的例子,需求仍然十分的簡(jiǎn)單:你買了一張火車票,必須在30分鐘之內(nèi)付款,否則該訂單被自動(dòng)取消。「訂單30分鐘不付款自動(dòng)取消,這個(gè)任務(wù)就是一個(gè)延時(shí)任務(wù)。」 我們的火車票訂單取消任務(wù),從需求上看并不需要非常精準(zhǔn)的延時(shí),所以是可以使用時(shí)間輪算法來完成這個(gè)任務(wù)的。
首先通過maven坐標(biāo)引入netty
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.45.Final</version> </dependency>
然后我們創(chuàng)建一個(gè)時(shí)間輪,如果是Spring的開發(fā)環(huán)境,我們可以這么做。下文中我們new了一個(gè)包含512個(gè)bucket的時(shí)間輪,每個(gè)時(shí)間輪的輪片時(shí)間間隔是100毫秒。
@Bean("hashedWheelTimer") public HashedWheelTimer hashedWheelTimer(){ return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); }
舉例:當(dāng)用戶買火車票下單的時(shí)候,向時(shí)間輪中添加一個(gè)30分鐘的延時(shí)任務(wù)。延時(shí)任務(wù)將在30分鐘之后被執(zhí)行,下文的lambda表達(dá)式部分實(shí)現(xiàn)了一個(gè)TimerTask(task)延時(shí)任務(wù)。這個(gè)延時(shí)任務(wù)的函數(shù)體內(nèi),請(qǐng)一定使用異步任務(wù),即:?jiǎn)为?dú)起一個(gè)線程或者使用SpringBoot異步任務(wù)線程池。因?yàn)閃orker線程是單線程的,你的任務(wù)處理時(shí)間長(zhǎng)于tickDuration會(huì)妨礙后續(xù)時(shí)間輪輪片上的任務(wù)的執(zhí)行。
//訂單下單操作 void order(String orderInfo) { //下單的時(shí)候,向時(shí)間輪中添加一個(gè)30分鐘的延時(shí)任務(wù) hashedWheelTimer.newTimeout(task -> { //注意這里使用異步任務(wù)線程池或者開啟線程進(jìn)行訂單取消任務(wù)的處理 cancelOrder(orderInfo); }, 30, TimeUnit.MINUTES); }
三、異步任務(wù)線程池
我們?cè)谏衔闹幸呀?jīng)多次強(qiáng)調(diào),時(shí)間輪的任務(wù)TimerTask的執(zhí)行內(nèi)容要做成異步的。最簡(jiǎn)單的做法就是接到一個(gè)任務(wù)之后啟動(dòng)一個(gè)線程處理該任務(wù)。在Spring環(huán)境下其實(shí)我們有更好的選擇,就是使用Spring的線程池,這個(gè)線程池是可以自定義的。比如:下文中的用法是我事先定義了一個(gè)名字為test的線程池,然后通過@Async使用即可。
@Async("test") public void cancelOrder(String orderInfo){ //查詢訂單支付信息,如果用戶未支付,關(guān)閉訂單 }
可能有的朋友,還不知道該如何自定義一個(gè)Spring線程池,可以參考:我之前寫過一個(gè)SpringBoot的**「可觀測(cè)、易配置」**的線程池開源項(xiàng)目,源代碼地址:https://gitee.com/hanxt/zimug-monitor-threadpool。我的這個(gè)zimug-monitor-threadpool開源項(xiàng)目,可以做到對(duì)線程池使用情況的監(jiān)控,我自己平時(shí)用的效果還不錯(cuò),向大家推薦一下!
四、時(shí)間輪優(yōu)缺點(diǎn)
時(shí)間輪算法實(shí)現(xiàn)延時(shí)任務(wù)的優(yōu)點(diǎn)就是,相對(duì)于使用JDK的DelayQueue,其算法上具有優(yōu)勢(shì),執(zhí)行性能相對(duì)好一些。其缺點(diǎn)就是所有的延時(shí)任務(wù)以及延時(shí)觸發(fā)的管理,都是在單個(gè)應(yīng)用服務(wù)的內(nèi)存中進(jìn)行的,一旦該應(yīng)用服務(wù)發(fā)生故障重啟服務(wù),時(shí)間輪任務(wù)數(shù)據(jù)將全部丟失。這一缺點(diǎn)和DelayQueue是一樣的。為了解決這個(gè)問題,我們可以使用redis、RocketMQ等分布式中間件來管理延時(shí)任務(wù)消息的方式來實(shí)現(xiàn)延時(shí)任務(wù),這個(gè)我會(huì)在后續(xù)的文章中為大家介紹。
知識(shí)點(diǎn)補(bǔ)充
下面主要和大家一起來分析下Netty時(shí)間輪調(diào)度算法的原理
時(shí)間輪狀態(tài)
時(shí)間輪有以下三種狀態(tài):
- WORKER_STATE_INIT:初始化狀態(tài),此時(shí)時(shí)間輪內(nèi)的工作線程還沒有開啟
- WORKER_STATE_STARTED:運(yùn)行狀態(tài),時(shí)間輪內(nèi)的工作線程已經(jīng)開啟
- WORKER_STATE_SHUTDOWN:終止?fàn)顟B(tài),時(shí)間輪停止工作
狀態(tài)轉(zhuǎn)換如下:
構(gòu)造函數(shù)
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // 初始化時(shí)間輪數(shù)組,時(shí)間輪大小為大于等于 ticksPerWheel 的第一個(gè) 2 的冪,和 HashMap 類似 wheel = createWheel(ticksPerWheel); // 取模用,用來定位數(shù)組中的槽 mask = wheel.length - 1; // 為了保證精度,時(shí)間輪內(nèi)的時(shí)間單位為納秒 long duration = unit.toNanos(tickDuration); // 時(shí)間輪內(nèi)的時(shí)鐘撥動(dòng)頻率不宜太大也不宜太小 if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } // 創(chuàng)建工作線程 workerThread = threadFactory.newThread(worker); // 非守護(hù)線程且 leakDetection 為 true 時(shí)檢測(cè)內(nèi)存是否泄漏 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // 初始化最大等待任務(wù)數(shù) this.maxPendingTimeouts = maxPendingTimeouts; // 如果創(chuàng)建的時(shí)間輪實(shí)例大于 64,打印日志,并且這個(gè)日志只會(huì)打印一次 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
構(gòu)造函數(shù)中的參數(shù)相當(dāng)重要,當(dāng)自定義時(shí)間輪時(shí),我們應(yīng)該根據(jù)業(yè)務(wù)的范圍設(shè)置合理的參數(shù):
- threadFactory:創(chuàng)建時(shí)間輪任務(wù)線程的工廠,通過這個(gè)工廠可以給我們的線程自定義一些屬性(線程名、異常處理等)
- tickDuration:時(shí)鐘多長(zhǎng)時(shí)間撥動(dòng)一次,值越小,時(shí)間輪精度越高
- unit:
tickDuration
的單位 - ticksPerWheel:時(shí)間輪數(shù)組大小
- leakDetection:是否檢測(cè)內(nèi)存泄漏
- maxPendingTimeouts:時(shí)間輪內(nèi)最大等待的任務(wù)數(shù)
時(shí)間輪的時(shí)鐘撥動(dòng)時(shí)長(zhǎng)應(yīng)該根據(jù)業(yè)務(wù)設(shè)置恰當(dāng)?shù)闹?,如果設(shè)置的過大,可能導(dǎo)致任務(wù)觸發(fā)時(shí)間不準(zhǔn)確。如果設(shè)置的過小,時(shí)間輪轉(zhuǎn)動(dòng)頻繁,任務(wù)少的情況下加載不到任務(wù),屬于一直空轉(zhuǎn)的狀態(tài),會(huì)占用 CPU 線程資源。
為了防止時(shí)間輪占用過多的 CPU 資源,當(dāng)創(chuàng)建的時(shí)間輪對(duì)象大于 64 時(shí)會(huì)以日志的方式提示。
構(gòu)造函數(shù)中只是初始化了輪線程,并沒有開啟,當(dāng)?shù)谝淮瓮鶗r(shí)間輪內(nèi)添加任務(wù)時(shí),線程才會(huì)開啟。
到此這篇關(guān)于Java利用Netty時(shí)間輪實(shí)現(xiàn)延時(shí)任務(wù)的文章就介紹到這了,更多相關(guān)Java Netty時(shí)間輪內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot集成Mybatis-plus并實(shí)現(xiàn)自動(dòng)生成相關(guān)文件的示例代碼
Mybatis-Plus是一個(gè)優(yōu)秀的Mybatis增強(qiáng)工具,目前更新到3.1.1,本文通過示例代碼給大家介紹SpringBoot集成Mybatis-plus并實(shí)現(xiàn)自動(dòng)生成相關(guān)文件的問題,感興趣的朋友跟隨小編一起看看吧2021-12-12java?Web實(shí)現(xiàn)用戶登錄功能圖文教程
這篇文章主要給大家介紹了關(guān)于java?Web實(shí)現(xiàn)用戶登錄功能的相關(guān)資料,在開發(fā)Web應(yīng)用程序中,用戶登錄是一個(gè)常見的功能,文中通過代碼以及圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-10-10Maven項(xiàng)目分析剔除無用jar引用的方法步驟
這篇文章主要介紹了Maven項(xiàng)目分析剔除無用jar引用的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10java加載properties文件的六種方法總結(jié)
這篇文章主要介紹了java加載properties文件的六種方法總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-05-05深入淺出的學(xué)習(xí)Java ThreadLocal
本文會(huì)基于實(shí)際場(chǎng)景介紹ThreadLocal如何使用以及內(nèi)部實(shí)現(xiàn)機(jī)制。 具有很好的參考價(jià)值,下面跟著小編一起來看下吧2017-02-02