springboot內(nèi)置tomcat之NIO處理流程一覽
前言
springboot內(nèi)置的tomcat目前默認(rèn)是基于NIO來(lái)實(shí)現(xiàn)的,本文介紹下tomcat接受請(qǐng)求的一些組件及組件之間的關(guān)聯(lián)
tomcat組件
本文只介紹NIO中tomcat的組件
我們直接看NIO的核心類(lèi)NioEndpoint的startInternal方法
Acceptor組件
public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); // Start poller threads // 核心代碼1 pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } // 核心代碼2 startAcceptorThreads(); } }
看核心代碼1的位置構(gòu)造了一個(gè)Poller數(shù)組,Poller是一個(gè)實(shí)現(xiàn)了Runnable的類(lèi),并且啟動(dòng)了該線程類(lèi),
getPollerThreadCount()方法返回了2和當(dāng)前物理機(jī)CPU內(nèi)核數(shù)的最小值,即創(chuàng)建的數(shù)組最大值為2
接下來(lái)看核心代碼2startAcceptorThreads()
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
創(chuàng)建了多個(gè)Acceptor類(lèi),Acceptor也是實(shí)現(xiàn)了Runnable的線程類(lèi),創(chuàng)建個(gè)數(shù)默認(rèn)是1
然后我們看下Acceptor啟動(dòng)后做了什么,我們直接看run方法
public void run() { ...... U socket = null; try { // Accept the next incoming connection from the server // socket // 核心代碼1 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { ...... } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // 核心代碼2 if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } ...... }
核心代碼1很明顯是一個(gè)阻塞模型,即接受客戶端連接的,當(dāng)沒(méi)有客戶端連接時(shí)會(huì)處于阻塞,這里可以看到默認(rèn)情況下tomcat在nio模式下只有一個(gè)Acceptor線程類(lèi)來(lái)接受連接
然后看核心代碼2
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } // 核心代碼 getPoller0().register(channel); } catch (Throwable t) { ...... } return true; }
我們看核心代碼getPoller0().register(channel)
public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // 核心代碼 addEvent(r); }
看addEvent
private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }
events的定義
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
這里可以看到封裝了一個(gè)PollerEvent 并且扔到了一個(gè)隊(duì)列里面,然后當(dāng)前類(lèi)就結(jié)束了
由此可得Acceptor的作用就是接受客戶端連接,并且把連接封裝起來(lái)扔到了一個(gè)隊(duì)列中
Poller
我們前面已經(jīng)創(chuàng)建并且啟動(dòng)了多個(gè)Poller線程類(lèi),默認(rèn)的數(shù)量是小于等于2的。
然后我們看下Poller類(lèi)做了什么,同樣我們看run方法
@Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { // 核心代碼1 hasEvents = events(); ....... Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); // 核心代碼2 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
先看核心代碼1 hasEvents = events()
public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { // 核心代碼 pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }
核心代碼run
@Override public void run() { if (interestOps == OP_REGISTER) { try { // 核心代碼,注冊(cè)到selector輪訓(xùn)器 socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { ...... } }
可以看出大概的意思就是從剛才我們放進(jìn)去的隊(duì)列events里面取數(shù)據(jù)放到了eventCache里面,eventCache的定義SynchronizedStack eventCache,當(dāng)取到數(shù)據(jù)后返回true,這個(gè)時(shí)候就會(huì)進(jìn)入核心代碼2處的processKey(sk, attachment),也就是開(kāi)始處理請(qǐng)求了
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { // 核心代碼 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } ...... }
這里就可以看到我們的NIO的模型了,也就是多路復(fù)用的模型,輪詢來(lái)判斷key的狀態(tài),當(dāng)key是可讀或者可寫(xiě)時(shí)執(zhí)行processSocket
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // 核心代碼 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
這里就是核心代碼了,可以看到getExecutor()方法,獲取線程池,這個(gè)線程池是在初始化tomcat時(shí)提前初始化好的,默認(rèn)情況下核心線程是10,最大線程是200。線程池的配置可以根據(jù)我們自己配置來(lái)設(shè)置大小。
這里拿到線程池然后包裝了一個(gè)SocketProcessorBase線程類(lèi)扔到線程池里面取執(zhí)行
從這里可以看到Poller的功能就是從前面的隊(duì)列里面獲取連接然后包裝成SocketProcessorBase之后扔到線程池里面去執(zhí)行,SocketProcessorBase才是最終真正處理請(qǐng)求的
總結(jié)
根據(jù)上面的分析我們已經(jīng)可以看到tomcat的執(zhí)行流程了,這里盜用網(wǎng)上的一張比較好的圖
大致流程為
1、創(chuàng)建一個(gè)Acceptor線程來(lái)接收用戶連接,接收到之后扔到events queue隊(duì)列里面,默認(rèn)情況下只有一個(gè)線程來(lái)接收
2、創(chuàng)建Poller線程,數(shù)量小于等于2,Poller對(duì)象是NIO的核心,在Poller中,維護(hù)了一個(gè)Selector對(duì)象;當(dāng)Poller從隊(duì)列中取出socket后,注冊(cè)到該Selector中;然后通過(guò)遍歷Selector,找出其中可讀的socket,然后扔到線程池中處理相應(yīng)請(qǐng)求,這就是典型的NIO多路復(fù)用模型。
3、扔到線程池中的SocketProcessorBase處理請(qǐng)求
相較于BIO模型的tomcat,NIO的優(yōu)勢(shì)分析
1、BIO中的流程應(yīng)該是接收到請(qǐng)求之后直接把請(qǐng)求扔給線程池去做處理,在這個(gè)情況下一個(gè)連接即需要一個(gè)線程來(lái)處理,線程既需要讀取數(shù)據(jù)還需要處理請(qǐng)求,線程占用時(shí)間長(zhǎng),很容易達(dá)到最大線程
2、NIO的流程的不同點(diǎn)在于Poller類(lèi)采用了多路復(fù)用模型,即Poller類(lèi)只有檢查到可讀或者可寫(xiě)的連接時(shí)才把當(dāng)前連接扔給線程池來(lái)處理,這樣的好處是大大節(jié)省了連接還不能讀寫(xiě)時(shí)的處理時(shí)間(如讀取請(qǐng)求數(shù)據(jù)),也就是說(shuō)NIO“讀取socket并交給Worker中的線程”這個(gè)過(guò)程是非阻塞的,當(dāng)socket在等待下一個(gè)請(qǐng)求或等待釋放時(shí),并不會(huì)占用工作線程,因此Tomcat可以同時(shí)處理的socket數(shù)目遠(yuǎn)大于最大線程數(shù),并發(fā)性能大大提高。
以上就是我對(duì)于tomcat中nio處理模型的一些理解。希望能給大家一個(gè)參考,也希望大家多多支持腳本之家
- springboot內(nèi)置的tomcat支持最大的并發(fā)量問(wèn)題
- springboot內(nèi)置tomcat調(diào)優(yōu)并發(fā)線程數(shù)解析
- SpringBoot應(yīng)用啟動(dòng)內(nèi)置Tomcat的過(guò)程源碼分析
- Springboot 使用內(nèi)置tomcat禁止不安全HTTP的方法
- 傳統(tǒng)tomcat啟動(dòng)服務(wù)與springboot啟動(dòng)內(nèi)置tomcat服務(wù)的區(qū)別(推薦)
- Springboot內(nèi)置Tomcat配置參數(shù)調(diào)優(yōu)方式
相關(guān)文章
Java動(dòng)態(tài)代理機(jī)制的實(shí)例詳解
這篇文章主要介紹了 Java動(dòng)態(tài)代理機(jī)制的實(shí)例詳解的相關(guān)資料,希望通過(guò)本文大家能夠掌握動(dòng)態(tài)代理機(jī)制,需要的朋友可以參考下2017-09-09Java Fluent Mybatis實(shí)戰(zhàn)之構(gòu)建項(xiàng)目與代碼生成篇上
Java中常用的ORM框架主要是mybatis, hibernate, JPA等框架。國(guó)內(nèi)又以Mybatis用的多,基于mybatis上的增強(qiáng)框架,又有mybatis plus和TK mybatis等。今天我們介紹一個(gè)新的mybatis增強(qiáng)框架 fluent mybatis2021-10-10Java從控制臺(tái)讀入數(shù)據(jù)的幾種方法總結(jié)
下面小編就為大家?guī)?lái)一篇Java從控制臺(tái)讀入數(shù)據(jù)的幾種方法總結(jié)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-10-10java 輸入一個(gè)數(shù)字,反轉(zhuǎn)輸出這個(gè)數(shù)字的值(實(shí)現(xiàn)方法)
下面小編就為大家?guī)?lái)一篇java 輸入一個(gè)數(shù)字,反轉(zhuǎn)輸出這個(gè)數(shù)字的值(實(shí)現(xiàn)方法)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-10-10