Netty源碼分析NioEventLoop執(zhí)行select操作入口
分析完了selector的創(chuàng)建和優(yōu)化的過程, 這一小節(jié)分析select相關(guān)操作
select操作的入口
NioEventLoop的run方法
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //輪詢io事件(1) select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默認是50 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //記錄下開始時間 final long ioStartTime = System.nanoTime(); try { //處理輪詢到的key(2) processSelectedKeys(); } finally { //計算耗時 final long ioTime = System.nanoTime() - ioStartTime; //執(zhí)行task(3) runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代碼省略 } }
代碼比較長, 其實主要分為三部分:
1. 輪詢io事件
2. 處理輪詢到的key
3. 執(zhí)行task
這一小節(jié), 主要剖析第一部分
輪詢io事件
首先switch塊中默認會走到SelectStrategy.SELECT中, 執(zhí)行select(wakenUp.getAndSet(false))方法
參數(shù)wakenUp.getAndSet(false)代表當前select操作是未喚醒狀態(tài)
進入到select(wakenUp.getAndSet(false))方法中
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; //當前系統(tǒng)的納秒數(shù) long currentTimeNanos = System.nanoTime(); //截止時間=當前時間+隊列第一個任務剩余執(zhí)行時間 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //阻塞時間(毫秒)=(截止時間-當前時間+0.5毫秒) long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //進行阻塞式的select操作 int selectedKeys = selector.select(timeoutMillis); //輪詢次數(shù) selectCnt ++; //如果輪詢到一個事件(selectedKeys != 0), 或者當前select操作需要喚醒(oldWakenUp), //或者在執(zhí)行select操作時已經(jīng)被外部線程喚醒(wakenUp.get()), //或者任務隊列已經(jīng)有任務(hasTask), 或者定時任務隊列中有任務(hasScheduledTasks()) if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } //省略 //記錄下當前時間 long time = System.nanoTime(); //當前時間-開始時間>=超時時間(條件成立, 執(zhí)行過一次select操作, 條件不成立, 有可能發(fā)生空輪詢) if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //代表已經(jīng)進行了一次阻塞式select操作, 操作次數(shù)重置為1 selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //省略日志代碼 //如果空輪詢的次數(shù)大于一個閾值(512), 解決空輪詢的bug rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //代碼省略 } catch (CancelledKeyException e) { //省略 } }
首先通過 long currentTimeNanos = System.nanoTime() 獲取系統(tǒng)的納秒數(shù)
繼續(xù)往下看:
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
delayNanos(currentTimeNanos)代表距定時任務中第一個任務剩余多長時間, 這個時間+當前時間代表這次操作不能超過的時間, 因為超過之后定時任務不能嚴格按照預定時間執(zhí)行, 其中定時任務隊列是已經(jīng)按照執(zhí)行時間有小到大排列好的隊列, 所以第一個任務則是最近需要執(zhí)行的任務, selectDeadLineNanos就代表了當前操作不能超過的時間
然后就進入到了無限for循環(huán)
for循環(huán)中我們關(guān)注:
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L
selectDeadLineNanos - currentTimeNanos+500000L 代表截止時間-當前時間+0.5毫秒的調(diào)整時間, 除以1000000表示將計算的時間轉(zhuǎn)化為毫秒數(shù)
最后算出的時間就是selector操作的阻塞時間, 并賦值到局部變量的timeoutMillis中
后面有個判斷 if(imeoutMillis<0) , 代表當前時間已經(jīng)超過了最后截止時間+0.5毫秒, selectCnt == 0 代表沒有進行select操作, 滿足這兩個條件, 則執(zhí)行selectNow()之后, 將selectCnt賦值為1之后跳出循環(huán)
如果沒超過截止時間, 就進行了 if(hasTasks() && wakenUp.compareAndSet(false, true)) 判斷
這里我們關(guān)注hasTasks()方法, 這里是判斷當前NioEventLoop所綁定的taskQueue是否有任務, 如果有任務, 則執(zhí)行selectNow()之后, 將selectCnt賦值為1之后跳出循環(huán)(跳出循環(huán)之后去執(zhí)行任務隊列中的任務)
hasTasks()方法可以自己跟一下, 非常簡單
如果沒有滿足上述條件, 就會執(zhí)行 int selectedKeys = selector.select(timeoutMillis) 進行阻塞式輪詢, 并且自增輪詢次數(shù), 而后會進行如下判斷:
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; }
selectedKeys != 0代表已經(jīng)有輪詢到的事件, oldWakenUp代表當前select操作是否需要喚醒, wakenUp.get()說明已經(jīng)被外部線程喚醒, hasTasks()代表任務隊列是否有任務, hasScheduledTasks()代表定時任務隊列是否任務, 滿足條件之一, 就跳出循環(huán)
long time = System.nanoTime() 記錄了當前的時間, 之后有個判斷:
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)
這里的意思是當前時間-阻塞時間>方法開始執(zhí)行的時間, 這里說明已經(jīng)完整的執(zhí)行完成了一個阻塞的select()操作, 將selectCnt設(shè)置成1
如果此條件不成立, 說明沒有完整執(zhí)行select()操作, 可能觸發(fā)了一次空輪詢, 根據(jù)前一個selectCnt++這步我們知道, 每觸發(fā)一次空輪詢selectCnt都會自增
之后會進入第二個判斷
SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD
其中SELECTOR_AUTO_REBUILD_THRESHOLD默認是512, 這個判斷意思就是空輪詢的次數(shù)如果超過512次, 則會認為是發(fā)生了epoll bug, 這樣會通過rebuildSelector()方法重新構(gòu)建selector, 然后將重新構(gòu)建的selector賦值到局部變量selector, 執(zhí)行一次selectNow(), 將selectCnt初始化1, 跳出循環(huán)
rebuildSelector()方法
rebuildSelector()方法中, 看netty是如何解決epoll bug的
public void rebuildSelector() { //是否是由其他線程發(fā)起的 if (!inEventLoop()) { //如果是其他線程發(fā)起的, 將rebuildSelector()封裝成任務隊列, 由NioEventLoop進行調(diào)用 execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { //重新創(chuàng)建一個select newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } int nChannels = 0; for (;;) { try { //拿到舊select中所有的key for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { Object a = key.attachment(); //代碼省略 //獲取key注冊的事件 int interestOps = key.interestOps(); //將key注冊的事件取消 key.cancel(); //注冊到重新創(chuàng)建的新的selector中 SelectionKey newKey = key.channel().register(newSelector, interestOps, a); //如果channel是NioChannel if (a instanceof AbstractNioChannel) { //重新賦值 ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { //代碼省略 } } } catch (ConcurrentModificationException e) { continue; } break; } selector = newSelector; //代碼省略 }
首先會判斷是不是當前NioEventLoop線程執(zhí)行的, 如果不是, 則將構(gòu)建方法封裝成task由當前NioEventLoop執(zhí)行
final Selector oldSelector = selector 表示拿到舊的selector
然后通過 newSelector = openSelector() 創(chuàng)建新的selector
通過for循環(huán)遍歷所有注冊在selector中的key
Object a = key.attachment() 是獲取channel, 第一章講過, 在注冊時, 將自身作為屬性綁定在key上
for循環(huán)體中, 通過 int interestOps = key.interestOps() 獲取其注冊的事件
key.cancel()將注冊的事件進行取消
SelectionKey newKey = key.channel().register(newSelector, interestOps, a)
將channel以及注冊的事件注冊在新的selector中
if (a instanceof AbstractNioChannel) 判斷是不是NioChannel
如果是NioChannel, 則通過 ((AbstractNioChannel) a).selectionKey = newKey 將自身的屬性selectionKey賦值為新返回的key
selector = newSelector 將自身NioEventLoop屬性selector賦值為新創(chuàng)建的newSelector
至此, 就是netty解決epoll bug的步驟, 其實就是創(chuàng)建一個新的selector, 將舊selector中注冊的channel和事件重新注冊到新的selector中, 然后將自身selector屬性替換成新創(chuàng)建的selector
以上就是Netty源碼分析NioEventLoop執(zhí)行select操作入口的詳細內(nèi)容,更多關(guān)于Netty分布式NioEventLoop selector操作的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot框架中如何整合mybatis框架思路詳解
這篇文章主要介紹了springboot框架中如何整合mybatis框架,本文通過示例圖文相結(jié)合給大家介紹的非常詳細,需要的朋友可以參考下2022-12-12mybatis 通過攔截器打印完整的sql語句以及執(zhí)行結(jié)果操作
這篇文章主要介紹了mybatis 通過攔截器打印完整的sql語句以及執(zhí)行結(jié)果操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10Java代理模式實例詳解【靜態(tài)代理與動態(tài)代理】
這篇文章主要介紹了Java代理模式,結(jié)合實例形式詳細分析了java靜態(tài)代理與動態(tài)代理模式相關(guān)概念、原理、操作技巧與注意事項,需要的朋友可以參考下2019-09-09基于CyclicBarrier和CountDownLatch的使用區(qū)別說明
這篇文章主要介紹了基于CyclicBarrier和CountDownLatch的使用區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09Spring Boot整合FTPClient線程池的實現(xiàn)示例
這篇文章主要介紹了Spring Boot整合FTPClient線程池的實現(xiàn)示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-12-12Java數(shù)據(jù)庫連接PreparedStatement的使用詳解
這篇文章主要介紹了Java數(shù)據(jù)庫連接PreparedStatement的使用詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-08-08