Redis所實現(xiàn)的Reactor模型設(shè)計方案
寫在文章開頭
我們都知道解決C10k
問題的最好方案就是通過在IO多路復(fù)用
的基礎(chǔ)上通過reactor
模型實現(xiàn)高性能的網(wǎng)絡(luò)并發(fā)程序,借助這個設(shè)計,redis的主線程也是基于IO多路復(fù)用
以reactor
模型的思路實現(xiàn)了一個高性能的單線程內(nèi)存數(shù)據(jù),本文將帶領(lǐng)讀者從源碼的角度來查看redis
關(guān)于reactor
模型的設(shè)計。
詳解Redis中的Reactor模型
Reactor模型掃盲
在此之前我們先來了解一下Reactor
模型,在高性能網(wǎng)絡(luò)并發(fā)程序的設(shè)計中,Reactor
模型通過reactor
接收用戶連接事件、讀事件、寫事件這些網(wǎng)絡(luò)事件,得到連接事件之后通過acceptor
為其分配handler
,后續(xù)的這些客戶端的讀寫事件都會交由handler
完成讀寫事件的處理,由此實現(xiàn)盡可能少的線程處理盡可能多的連接。
詳解reactor的實現(xiàn)
上文我們簡單的對Reactor
模型進行了簡單的掃盲,接下來我們將從redis
的源碼來了解redis
對于Reactor
模型的實現(xiàn),我們都知道Reactor
模型是通過reactor接收連接、讀、寫三種事件的,這一點我們可以直接在main
方法看到aeMain
的調(diào)用,該方法內(nèi)部本質(zhì)就是通過epoll模型進行非阻塞獲取就的網(wǎng)絡(luò)事件:
int main(int argc, char **argv) { //前置初始化步驟 //...... //事件循環(huán)輪詢前置操作 aeSetBeforeSleepProc(server.el,beforeSleep); //執(zhí)行事件驅(qū)動框架,循環(huán)處理各種觸發(fā)的事件 aeMain(server.el); //事件循環(huán)后置操作 aeDeleteEventLoop(server.el); return 0; }
我們步入aeMain
方法,可以看到只要eventLoop
沒有停止就會無限循環(huán)調(diào)用aeProcessEvents
獲取并處理就緒的事件:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { //...... //輪詢并處理就緒的事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
步入aeProcessEvents
方法,我們就可以看到redis
通過對于epoll
的封裝函數(shù)aeApiPoll
非阻塞獲取就緒的IO事件
,注意筆者所強調(diào)的非阻塞獲取,這也就是為什么redis僅僅用一個主線程即可實現(xiàn)Reactor模型的原因所在。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { //...... //非阻塞獲取就緒事件 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { //...... //處理事件 processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
對此我們再次步入aeApiPoll
實現(xiàn)可以看到redis
對于epoll
的調(diào)用epoll_wait
,得到事件數(shù)retval
之后,直接基于retval
遍歷eventLoop
的events
這里面存儲的就是所有收到的事件aeFiredEvent
,redis
會根據(jù)其事件類型累加對應(yīng)的事件mask
值,例如如果是得到的事件類型是EPOLLIN
則mask值會加上AE_READABLE
(1),若是標(biāo)準(zhǔn)輸出事件EPOLLOUT
則累加AE_WRITABLE
即2:
對應(yīng)的我們給出這段基于epoll
實現(xiàn)reacor
的實現(xiàn),可以看到其reactor
通過事件輪詢獲取對應(yīng)的事件類型再將其封裝為aeFileEvent
存到事件數(shù)組eventLoop->fired
中:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; //遍歷事件 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; //根據(jù)事件類型累加讀寫的mask值 if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; //將該事件存到fired數(shù)組中 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } //返回事件數(shù) return numevents; }
詳解事件的封裝
上文我們提到一個aeFileEvent
事件的概念,該個事件結(jié)構(gòu)如下圖所示,它通過mask
標(biāo)記當(dāng)前IO事件類型,在epoll
輪詢到事件時,它并通過rfileProc
讀事件處理指針和wfileProc
寫文件處理保存針對網(wǎng)絡(luò)IO事件
的處理函數(shù),注意這個處理函數(shù)我們完全可以直接理解為reactor
模型中的handler
,最后用clientData
記錄客戶端私有數(shù)據(jù)的指針:
typedef struct aeFileEvent { //記錄事件讀寫類型,如果是讀事件READABLE則mask+1,若是寫事件WRITABLE則加2 int mask; /* one of AE_(READABLE|WRITABLE) */ //讀事件處理器指針指向讀事件處理函數(shù)handler aeFileProc *rfileProc; //寫事件處理器指針指向讀事件處理函數(shù)handler aeFileProc *wfileProc; //記錄客戶端私有數(shù)據(jù)指針 void *clientData; } aeFileEvent;
這里我們以服務(wù)端socket
初始化階段為例展示一下aeFileEvent
對應(yīng)處理器的初始化過程,我們在redis
服務(wù)端啟動的main
函數(shù)可以看到initServer
的調(diào)用,該方法會為當(dāng)前服務(wù)端socket套接字的文件描述符綁定讀事件的處理器acceptTcpHandler
:
對應(yīng)的我們給出這一段事件綁定handler
的邏輯的核心代碼段:
int main(int argc, char **argv) { //...... //server初始化,其內(nèi)部會完成數(shù)據(jù)結(jié)構(gòu)、鍵值對數(shù)據(jù)庫初始化、網(wǎng)絡(luò)框架初始化工作 initServer(); } void initServer(void) { //...... for (j = 0; j < server.ipfd_count; j++) { //為每一個監(jiān)聽服務(wù)端socket的讀事件綁定對應(yīng)的TCP處理器acceptTcpHandler,并將其注冊到eventLoop中 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } //...... }
輪詢并分發(fā)到handler
上述步驟完成redis server
的事件注冊之后,main
方法的aeMain
函數(shù)就會通過epoll
輪詢eventLoop
中是否有就緒的IO事件,如果redis server
的fd
的讀事件就緒就會交給當(dāng)前對應(yīng)的讀處理器完成redis
客戶端初始化工作,后續(xù)redis
客戶端套接字的fd
也會將讀寫事件注冊到eventLoop
中,如此一來所有的服務(wù)端和客戶端socket
的讀寫事件都會注冊到epoll
上,讓epoll
作為reactor
進行輪詢,然后根據(jù)讀寫事件分配到各自的handler
即rfileProc/wfileProc
指針?biāo)赶虻暮瘮?shù)上。
這里我們補充的一下rfileProc/wfileProc
指針指向的函數(shù)列表:
rfileProc
:如果是redis
服務(wù)端則該指針指向acceptTcpHandler
處理新連接,如果是客戶端則指向readQueryFromClient
處理客戶端的命令。wfileProc
:該指針服務(wù)端和客戶端都一樣,指向sendReplyToClient
用于將響應(yīng)結(jié)果發(fā)送給客戶端。
對應(yīng)的我們給出上述描述的核心代碼段,可以看到main
方法會調(diào)用aeMain
開始事件輪詢:
int main(int argc, char **argv) { //前置初始化步驟 //...... //事件循環(huán)輪詢前置操作 aeSetBeforeSleepProc(server.el,beforeSleep); //執(zhí)行事件驅(qū)動框架,循環(huán)處理各種觸發(fā)的事件 aeMain(server.el); //事件循環(huán)后置操作 aeDeleteEventLoop(server.el); return 0; }
步入aeMain
即可看到無限循環(huán)傳入eventLoop
查看是否有就緒的事件:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { //...... //傳入eventLoop查看是否有socket的事件就緒 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
繼續(xù)步入aeProcessEvents
即看到輪詢就緒事件、acceptor
調(diào)用acceptTcpHandler
分發(fā)到讀寫的處理器handler
上、后續(xù)客戶端都會基于讀寫handler
完成事件處理這樣一套核心的reactor
模型設(shè)計:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { //...... //調(diào)用epoll獲取所有就緒的socket的讀寫事件 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { //獲取當(dāng)前事件的讀寫類型為mask賦值 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; //如果是讀事件則交給rfileProc指向的函數(shù),可以是服務(wù)端socket的連接處理器acceptTcpHandler,也可能是客戶端的命令處理器readQueryFromClient if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } //如果是寫事件則調(diào)用wfileProc指向的sendReplyToClient將結(jié)果發(fā)送給客戶端 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } //...... }
小結(jié)
自此我們將redis單線程的reactor模型設(shè)計都分析完成了,希望對你有幫助。
到此這篇關(guān)于Redis所實現(xiàn)的Reactor模型的文章就介紹到這了,更多相關(guān)Redis Reactor模型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis連接池監(jiān)控(連接池是否已滿)與優(yōu)化方法
本文詳細講解了如何在Linux系統(tǒng)中監(jiān)控Redis連接池的使用情況,以及如何通過連接池參數(shù)配置、系統(tǒng)資源使用情況、Redis命令監(jiān)控、外部監(jiān)控工具等多種方法進行檢測和優(yōu)化,以確保系統(tǒng)在高并發(fā)場景下的性能和穩(wěn)定性,討論了連接池的概念、工作原理、參數(shù)配置,以及優(yōu)化策略等內(nèi)容2024-09-09Redis從單點到集群部署模式(單機模式?主從模式?哨兵模式)
這篇文章主要為大家介紹了Redis從單點集群部署模式(單機模式?主從模式?哨兵模式)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-11-11Redis高并發(fā)防止秒殺超賣實戰(zhàn)源碼解決方案
本文主要介紹了Redis高并發(fā)防止秒殺超賣實戰(zhàn)源碼解決方案,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-10-10