close

大家好,我是飛哥!

今天開篇先給大家講個飛哥自己的小故事。我在學校和剛畢業頭一年主要從事的客戶端開發,那時候對服務器端編程還不擅長。

有一次去面試服務器端崗位,面試官問我有一個連接過來,你該怎麼編程處理它。我答道:「主線程收到請求後,創建一個子線程處理。」 面試官接着問,那如果有一千個連接同時來呢?我說「那就多創建一點線程,搞個線程池」。面試官繼續追問如果一萬個呢?我答道:「......不會...」。

事實上,服務器端只需要單線程可以達到非常高的處理能力,Redis 就是一個非常好的例子。僅僅靠單線程就可以支撐起每秒數萬 QPS 的高處理能力。今天我們就來帶大家看看 Redis 核心網絡模塊的內部實現,學習下 Redis 是如何做到如此的高性能的!

一、理解多路復用原理

在開始介紹 Redis 之前,我想有必要先來簡單介紹下 epoll。

在傳統的同步阻塞網絡編程模型里(沒有協程以前),性能上不來的根本原因在於進程線程都是笨重的傢伙。讓一個進(線)程只處理一個用戶請求確確實實是有點浪費了。

先拋開高內存開銷不說,在海量的網絡請求到來的時候,光是頻繁的進程線程上下文就讓 CPU 疲於奔命了。

如果把進程比作牧羊人,一個進(線)程同時只能處理一個用戶請求,相當於一個人只能看一隻羊,放完這一隻才能放下一隻。如果同時來了 1000 只羊,那就得 1000 個人去放,這人力成本是非常高的。

性能提升思路很簡單,就是讓很多的用戶連接來復用同一個進(線)程,這就是多路復用。多路指的是許許多多個用戶的網絡連接。復用指的是對進(線)程的復用。換到牧羊人的例子裡,就是一群羊只要一個牧羊人來處理就行了。

不過復用實現起來是需要特殊的 socket 事件管理機制的,最典型和高效的方案就是 epoll。放到牧羊人的例子來,epoll 就相當於一隻牧羊犬。

在 epoll 的系列函數裡, epoll_create 用於創建一個 epoll 對象,epoll_ctl 用來給 epoll 對象添加或者刪除一個 socket。epoll_wait 就是查看它當前管理的這些 socket 上有沒有可讀可寫事件發生。

當網卡上收到數據包後,Linux 內核進行一系列的處理後把數據放到 socket 的接收隊列。然後會檢查是否有 epoll 在管理它,如果是則在 epoll 的就緒隊列中插入一個元素。epoll_wait 的操作就非常的簡單了,就是到 epoll 的就緒隊列上來查詢有沒有事件發生就行了。關於 epoll 這隻「牧羊犬」的工作原理參見深入揭秘 epoll 是如何實現 IO 多路復用的 (Javaer 習慣把基於 epoll 的網絡開發模型叫做 NIO)

在基於 epoll 的編程中,和傳統的函數調用思路不同的是,我們並不能主動調用某個 API 來處理。因為無法知道我們想要處理的事件啥時候發生。所以只好提前把想要處理的事件的處理函數註冊到一個事件分發器上去。當事件發生的時候,由這個事件分發器調用回調函數進行處理。這類基於實現註冊事件分發器的開發模式也叫 Reactor 模型。

二、Redis 服務啟動初始化

理解了 epoll 原理後,我們再來實際看 Redis 具體是如何使用 epoll 的。直接在 Github 上就可以非常方便地獲取 Redis 的源碼。我們切到 5.0.0 版本來看單線程版本的實現(多線程我們改天再講)。

#gitclonehttps://github.com/redis/redis#cdredis#gitcheckout-b5.0.05.0.0

其中整個 Redis 服務的代碼總入口在 src/server.c 文件中,我把入口函數的核心部分摘了出來,如下。

//file:src/server.cintmain(intargc,char**argv){......//啟動初始化initServer();//運行事件處理循環,一直到服務器關閉為止aeMain(server.el);}

其實整個 Redis 的工作過程,就只需要理解清楚 main 函數中調用的 initServer 和 aeMain 這兩個函數就足夠了。

本節中我們重點介紹 initServer,在下一節介紹事件處理循環 aeMain。在 initServer 這個函數內,Redis 做了這麼三件重要的事情。

創建一個 epoll 對象
對配置的監聽端口進行 listen
把 listen socket 讓 epoll 給管理起來
//file:src/server.cvoidinitServer(){//2.1.1創建epollserver.el=aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);//2.1.2綁定監聽服務端口listenToPort(server.port,server.ipfd,&server.ipfd_count);//2.1.3註冊accept事件處理器for(j=0;j<server.ipfd_count;j++){aeCreateFileEvent(server.el,server.ipfd[j],AE_READABLE,acceptTcpHandler,NULL);}...}

接下來我們分別來看。

2.1 創建 epoll 對象

本小節的邏輯看起來貌似不短,但其實只是創建了一個 epoll 對象出來而已。

創建 epoll 對象的邏輯在 aeCreateEventLoop 中,在創建完後,Redis 將其保存在 redisServer 的 aeEventLoop 成員中,以備後續使用。

structredisServer{...aeEventLoop*el;}

我們來看 aeCreateEventLoop 詳細邏輯。Redis 在操作系統提供的 epoll 對象基礎上又封裝了一個 eventLoop 出來,所以創建的時候是先申請和創建 eventLoop。

//file:src/ae.caeEventLoop*aeCreateEventLoop(intsetsize){aeEventLoop*eventLoop;eventLoop=zmalloc(sizeof(*eventLoop);//將來的各種回調事件就都會存在這裡eventLoop->events=zmalloc(sizeof(aeFileEvent)*setsize);......aeApiCreate(eventLoop);returneventLoop;}

在 eventLoop 里,我們稍微注意一下 eventLoop->events,將來在各種事件註冊的時候都會保存到這個數組裡。

//file:src/ae.htypedefstructaeEventLoop{......aeFileEvent*events;/*Registeredevents*/}

具體創建 epoll 的過程在 ae_epoll.c 文件下的 aeApiCreate 中。在這裡,真正調用了 epoll_create

//file:src/ae_epoll.cstaticintaeApiCreate(aeEventLoop*eventLoop){aeApiState*state=zmalloc(sizeof(aeApiState));state->epfd=epoll_create(1024);eventLoop->apidata=state;return0;}2.2 綁定監聽服務端口

我們再來看 Redis 中的 listen 過程,它在 listenToPort 函數中。雖然調用鏈條很長,但其實主要就是執行了個簡單 listen 而已。

//file:src/redis.cintlistenToPort(intport,int*fds,int*count){for(j=0;j<server.bindaddr_count||j==0;j++){fds[*count]=anetTcpServer(server.neterr,port,NULL,server.tcp_backlog);}}

Redis 是支持開啟多個端口的,所以在 listenToPort 中我們看到是啟用一個循環來調用 anetTcpServer。在 anetTcpServer 中,逐步會展開調用,直到執行到 bind 和 listen 系統調用。

//file:src/anet.cintanetTcpServer(char*err,intport,char*bindaddr,intbacklog){return_anetTcpServer(err,port,bindaddr,AF_INET,backlog);}staticint_anetTcpServer(......){//設置端口重用anetSetReuseAddr(err,s)//監聽anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog)}staticintanetListen(......){bind(s,sa,len);listen(s,backlog);......}
2.3 註冊事件回調函數

我們回頭再看一下 initServer,它調用 aeCreateEventLoop 創建了 epoll,調用 listenToPort 進行了服務端口的 bind 和 listen。接着就開始調用 aeCreateFileEvent 來註冊一個 accept 事件處理器。

//file:src/server.cvoidinitServer(){//2.1.1創建epollserver.el=aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);//2.1.2監聽服務端口listenToPort(server.port,server.ipfd,&server.ipfd_count);//2.1.3註冊accept事件處理器for(j=0;j<server.ipfd_count;j++){aeCreateFileEvent(server.el,server.ipfd[j],AE_READABLE,acceptTcpHandler,NULL);}...}

我們來注意看調用 aeCreateFileEvent 時傳的重要參數是 acceptTcpHandler,它表示將來在 listen socket 上有新用戶連接到達的時候,該函數將被調用執行。我們來看 aeCreateFileEvent 具體代碼。

//file:src/ae.cintaeCreateFileEvent(aeEventLoop*eventLoop,intfd,intmask,aeFileProc*proc,void*clientData){//取出一個文件事件結構aeFileEvent*fe=&eventLoop->events[fd];//監聽指定fd的指定事件aeApiAddEvent(eventLoop,fd,mask);//設置文件事件類型,以及事件的處理器fe->mask|=mask;if(mask&AE_READABLE)fe->rfileProc=proc;if(mask&AE_WRITABLE)fe->wfileProc=proc;//私有數據fe->clientData=clientData;}

函數 aeCreateFileEvent 一開始,從 eventLoop->events 獲取了一個 aeFileEvent 對象。在 2.1 中我們介紹過 eventLoop->events 數組,註冊的各種事件處理器會保存在這個地方。

接下來調用 aeApiAddEvent。這個函數其實就是對 epoll_ctl 的一個封裝。主要就是實際執行 epoll_ctl EPOLL_CTL_ADD。

//file:src/ae_epoll.cstaticintaeApiAddEvent(aeEventLoop*eventLoop,intfd,intmask){//addormodintop=eventLoop->events[fd].mask==AE_NONE?EPOLL_CTL_ADD:EPOLL_CTL_MOD;......//epoll_ctl添加事件epoll_ctl(state->epfd,op,fd,&ee);return0;}

每一個 eventLoop->events 元素都指向一個 aeFileEvent 對象。在這個對象上,設置了三個關鍵東西

rfileProc:讀事件回調
wfileProc:寫事件回調
clientData:一些額外的擴展數據

將來 當 epoll_wait 發現某個 fd 上有事件發生的時候,這樣 redis 首先根據 fd 到 eventLoop->events 中查找 aeFileEvent 對象,然後再看 rfileProc、wfileProc 就可以找到讀、寫回調處理函數。

回頭看 initServer 調用 aeCreateFileEvent 時傳參來看。

//file:src/server.cvoidinitServer(){......//2.1.3註冊accept事件處理器for(j=0;j<server.ipfd_count;j++){aeCreateFileEvent(server.el,server.ipfd[j],AE_READABLE,acceptTcpHandler,NULL);}}

listen fd 對應的讀回調函數 rfileProc 事實上就被設置成了 acceptTcpHandler,寫回調沒有設置,私有數據 client_data 也為 null。

三、Redis 事件處理循環

在上一節介紹完了 Redis 的啟動初始化過程,創建了 epoll,也進行了綁定監聽,也註冊了 accept 事件處理函數為 acceptTcpHandler。

//file:src/server.cintmain(intargc,char**argv){......//啟動初始化initServer();//運行事件處理循環,一直到服務器關閉為止aeMain(server.el);}

接下來,Redis 就會進入 aeMain 開始進行真正的用戶請求處理了。在 aeMain 函數中,是一個無休止的循環。在每一次的循環中,要做如下幾件事情。

通過 epoll_wait 發現 listen socket 以及其它連接上的可讀、可寫事件
若發現 listen socket 上有新連接到達,則接收新連接,並追加到 epoll 中進行管理
若發現其它 socket 上有命令請求到達,則讀取和處理命令,把命令結果寫到緩存中,加入寫任務隊列
每一次進入 epoll_wait 前都調用 beforesleep 來將寫任務隊列中的數據實際進行發送
如若有首次未發送完畢的,當寫事件發生時繼續發送
//file:src/ae.cvoidaeMain(aeEventLoop*eventLoop){eventLoop->stop=0;while(!eventLoop->stop){//如果有需要在事件處理前執行的函數,那麼運行它//3.4beforesleep處理寫任務隊列並實際發送之if(eventLoop->beforesleep!=NULL)eventLoop->beforesleep(eventLoop);//開始等待事件並處理//3.1epoll_wait發現事件//3.2處理新連接請求//3.3處理客戶連接上的可讀事件aeProcessEvents(eventLoop,AE_ALL_EVENTS);}}

以上就是 aeMain 函數的核心邏輯所在,接下來我們分別對如上提到的四件事情進行詳細的闡述。

3.1 epoll_wait 發現事件

Redis 不管有多少個用戶連接,都是通過 epoll_wait 來統一發現和管理其上的可讀(包括 liisten socket 上的 accept事件)、可寫事件的。甚至連 timer,也都是交給 epoll_wait 來統一管理的。

每當 epoll_wait 發現特定的事件發生的時候,就會調用相應的事先註冊好的事件處理函數進行處理。我們來詳細看 aeProcessEvents 對 epoll_wait 的封裝。

//file:src/ae.cintaeProcessEvents(aeEventLoop*eventLoop,intflags){//獲取最近的時間事件tvp=xxx//處理文件事件,阻塞時間由tvp決定numevents=aeApiPoll(eventLoop,tvp);for(j=0;j<numevents;j++){//從已就緒數組中獲取事件aeFileEvent*fe=&eventLoop->events[eventLoop->fired[j].fd];//如果是讀事件,並且有讀回調函數fe->rfileProc()//如果是寫事件,並且有寫回調函數fe->wfileProc()}}//file:src/ae_epoll.cstaticintaeApiPoll(aeEventLoop*eventLoop,structtimeval*tvp){//等待事件aeApiState*state=eventLoop->apidata;epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp?(tvp->tv_sec*1000+tvp->tv_usec/1000):-1);...}

aeProcessEvents 就是調用 epoll_wait 來發現事件。當發現有某個 fd 上事件發生以後,則調為其事先註冊的事件處理器函數 rfileProc 和 wfileProc。

3.2 處理新連接請求

我們假設現在有新用戶連接到達了。前面在我們看到 listen socket 上的 rfileProc 註冊的是 acceptTcpHandler。也就是說,如果有連接到達的時候,會回調到 acceptTcpHandler。

在 acceptTcpHandler 中,主要做了幾件事情

調用 accept 系統調用把用戶連接給接收回來
為這個新連接創建一個唯一 redisClient 對象
將這個新連接添加到 epoll,並註冊一個讀事件處理函數

接下來讓我們看上面這三件事情都分別是如何被處理的。

//file:src/networking.cvoidacceptTcpHandler(aeEventLoop*el,intfd,...){cfd=anetTcpAccept(server.neterr,fd,cip,...);acceptCommonHandler(cfd,0);}

在 anetTcpAccept 中執行非常的簡單,就是調用 accept 把連接接收回來。

//file:src/anet.cintanetTcpAccept(......){anetGenericAccept(err,s,(structsockaddr*)&sa,&salen)}staticintanetGenericAccept(......){fd=accept(s,sa,len)}

接下來在 acceptCommonHandler 為這個新的客戶端連接 socket,創建一個 redisClient 對象。

//file:src/networking.cstaticvoidacceptCommonHandler(intfd,intflags){//創建redisClient對象redisClient*c;c=createClient(fd);......}

在 createClient 中,創建 client 對象,並且為該用戶連接註冊了讀事件處理器。

//file:src/networking.credisClient*createClient(intfd){//為用戶連接創建client對象redisClient*c=zmalloc(sizeof(redisClient));if(fd!=-1){...//為用戶連接註冊讀事件處理器aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient,c)}...}

關於 aeCreateFileEvent 的處理過程這裡就不贅述了,詳情參見 2.3 節。其效果就是將該用戶連接 socket fd 對應的讀處理函數設置為 readQueryFromClient, 並且設置私有數據為 redisClient c。

3.3 處理客戶連接上的可讀事件

現在假設該用戶連接有命令到達了,就假設用戶發送了GET XXXXXX_KEY 命令。那麼在 Redis 的時間循環中調用 epoll_wait 發現該連接上有讀時間後,會調用在上一節中討論的為其註冊的讀處理函數 readQueryFromClient。

在讀處理函數 readQueryFromClient 中主要做了這麼幾件事情。

解析並查找命令
調用命令處理
添加寫任務到隊列
將輸出寫到緩存等待發送

我們來詳細地看 readQueryFromClient 的代碼。在 readQueryFromClient 中會調用 processInputBuffer,然後進入 processCommand 對命令進行處理。其調用鏈如下:

//file:src/networking.cvoidreadQueryFromClient(aeEventLoop*el,intfd,void*privdata,...){redisClient*c=(redisClient*)privdata;processInputBufferAndReplicate(c);}voidprocessInputBufferAndReplicate(client*c){...processInputBuffer(c);}//處理客戶端輸入的命令內容voidprocessInputBuffer(redisClient*c){//執行命令,processCommand(c);}

我們再來詳細看 processCommand 。

//file:intprocessCommand(redisClient*c){//查找命令,並進行命令合法性檢查,以及命令參數個數檢查c->cmd=c->lastcmd=lookupCommand(c->argv[0]->ptr);......//處理命令//如果是MULTI事務,則入隊,否則調用call直接處理if(c->flags&CLIENT_MULTI&&...){queueMultiCommand(c);}else{call(c,CMD_CALL_FULL);...}returnC_OK;}

我們先忽略 queueMultiCommand,直接看核心命令處理方法 call。

//file:src/server.cvoidcall(client*c,intflags){//查找處理命令,structredisCommand*real_cmd=c->cmd;//調用命令處理函數c->cmd->proc(c);......}

在 server.c 中定義了每一個命令對應的處理函數

//file:src/server.cstructredisCommandredisCommandTable[]={{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},......{"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},{"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},......}

對於 get 命令來說,其對應的命令處理函數就是 getCommand。也就是說當處理 GET 命令執行到 c->cmd->proc 的時候會進入到 getCommand 函數中來。

//file:src/t_string.cvoidgetCommand(client*c){getGenericCommand(c);}intgetGenericCommand(client*c){robj*o;if((o=lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))==NULL)returnC_OK;...addReplyBulk(c,o);returnC_OK;}

getGenericCommand 方法會調用 lookupKeyReadOrReply 來從內存中查找對應的 key值。如果找不到,則直接返回 C_OK;如果找到了,調用 addReplyBulk 方法將值添加到輸出緩衝區中。

//file:src/networking.cvoidaddReplyBulk(client*c,robj*obj){addReplyBulkLen(c,obj);addReply(c,obj);addReply(c,shared.crlf);}

其主題是調用 addReply 來設置回複數據。在 addReply 方法中做了兩件事情:

prepareClientToWrite 判斷是否需要返回數據,並且將當前 client 添加到等待寫返回數據隊列中。
調用 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩衝區中,等待寫入 socekt
//file:src/networking.cvoidaddReply(client*c,robj*obj){if(prepareClientToWrite(c)!=C_OK)return;if(sdsEncodedObject(obj)){if(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))!=C_OK)_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));}else{......}}

先來看 prepareClientToWrite 的詳細實現,

//file:src/networking.cintprepareClientToWrite(client*c){......if(!clientHasPendingReplies(c)&&!(c->flags&CLIENT_PENDING_READ))clientInstallWriteHandler(c);}//file:src/networking.cvoidclientInstallWriteHandler(client*c){c->flags|=CLIENT_PENDING_WRITE;listAddNodeHead(server.clients_pending_write,c);}

其中 server.clients_pending_write 就是我們說的任務隊列,隊列中的每一個元素都是有待寫返回數據的 client 對象。在 prepareClientToWrite 函數中,把 client 添加到任務隊列 server.clients_pending_write 里就算完事。

接下再來 _addReplyToBuffer,該方法是向固定緩存中寫,如果寫不下的話就繼續調用 _addReplyStringToList 往鍊表里寫。簡單起見,我們只看 _addReplyToBuffer 的代碼。

//file:src/networking.cint_addReplyToBuffer(client*c,constchar*s,size_tlen){......//拷貝到client對象的Responsebuffer中memcpy(c->buf+c->bufpos,s,len);c->bufpos+=len;returnC_OK;}3.4 beforesleep 處理寫任務隊列

回想在 aeMain 函數中,每次在進入 aeProcessEvents 前都需要先進行 beforesleep 處理。這個函數名字起的怪怪的,但實際上大有用處。

//file:src/ae.cvoidaeMain(aeEventLoop*eventLoop){eventLoop->stop=0;while(!eventLoop->stop){// beforesleep 處理寫任務隊列並實際發送之if(eventLoop->beforesleep!=NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop,AE_ALL_EVENTS);}}

該函數處理了許多工作,其中一項便是遍歷發送任務隊列,並將 client 發送緩存區中的處理結果通過 write 發送到客戶端手中。

我們來看下 beforeSleep 的實際源碼。

//file:src/server.cvoidbeforeSleep(structaeEventLoop*eventLoop){......handleClientsWithPendingWrites();}//file:src/networking.cinthandleClientsWithPendingWrites(void){listIterli;listNode*ln;intprocessed=listLength(server.clients_pending_write);//遍歷寫任務隊列server.clients_pending_writelistRewind(server.clients_pending_write,&li);while((ln=listNext(&li))){client*c=listNodeValue(ln);c->flags&=~CLIENT_PENDING_WRITE;listDelNode(server.clients_pending_write,ln);//實際將client中的結果數據發送出去writeToClient(c->fd,c,0)//如果一次發送不完則準備下一次發送if(clientHasPendingReplies(c)){//註冊一個寫事件處理器,等待epoll_wait發現可寫後再處理aeCreateFileEvent(server.el,c->fd,ae_flags,sendReplyToClient,c);}......}}

在 handleClientsWithPendingWrites 中,遍歷了發送任務隊列 server.clients_pending_write,並調用 writeToClient 進行實際的發送處理。

值得注意的是,發送 write 並不總是能一次性發送完的。假如要發送的結果太大,而系統為每個 socket 設置的發送緩存區又是有限的。

在這種情況下,clientHasPendingReplies 判斷仍然有未發送完的數據的話,就需要註冊一個寫事件處理函數到 epoll 上。等待 epoll 發現該 socket 可寫的時候再次調用 sendReplyToClient進行發送。

//file:src/networking.cintwriteToClient(intfd,client*c,inthandler_installed){while(clientHasPendingReplies(c)){//先發送固定緩衝區if(c->bufpos>0){nwritten=write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);if(nwritten<=0)break;......//再發送回復鍊表中數據}else{o=listNodeValue(listFirst(c->reply));nwritten=write(fd,o->buf+c->sentlen,objlen-c->sentlen);......}}}

writeToClient 中的主要邏輯就是調用 write 系統調用讓內核幫其把數據發送出去即可。由於每個命令的處理結果大小是不固定的。所以 Redis 採用的做法用固定的 buf + 可變鍊表來儲存結果字符串。這裡自然發送的時候就需要分別對固定緩存區和鍊表來進行發送了。

四、高性能 Redis 網絡原理總結

Redis 服務器端只需要單線程可以達到非常高的處理能力,每秒可以達到數萬 QPS 的高處理能力。如此高性能的程序其實就是對 Linux 提供的多路復用機制 epoll 的一個較為完美的運用而已。

在 Redis 源碼中,核心邏輯其實就是兩個,一個是 initServer 啟動服務,另外一個就是 aeMain 事件循環。把這兩個函數弄懂了,Redis 就吃透一大半了。

//file:src/server.cintmain(intargc,char**argv){......//啟動初始化initServer();//運行事件處理循環,一直到服務器關閉為止aeMain(server.el);}

在 initServer 這個函數內,Redis 做了這麼三件重要的事情。

創建一個 epoll 對象
對配置的監聽端口進行 listen
把 listen socket 讓 epoll 給管理起來

在 aeMain 函數中,是一個無休止的循環,它是 Redis 中最重要的部分。在每一次的循環中,要做的事情可以總結為如下圖。

通過 epoll_wait 發現 listen socket 以及其它連接上的可讀、可寫事件
若發現 listen socket 上有新連接到達,則接收新連接,並追加到 epoll 中進行管理
若發現其它 socket 上有命令請求到達,則讀取和處理命令,把命令結果寫到緩存中,加入寫任務隊列
每一次進入 epoll_wait 前都調用 beforesleep 來將寫任務隊列中的數據實際進行發送

其實事件分發器還處理了一個不明顯的邏輯,那就是如果 beforesleep 在將結果寫回給客戶端的時候,如果由於內核 socket 發送緩存區過小而導致不能一次發送完畢的時候,也會註冊一個寫事件處理器。等到 epoll_wait 發現對應的 socket 可寫的時候,再執行 write 寫處理。

整個 Redis 的網絡核心模塊就在咱們這一篇文章中都敘述透了(剩下的 Redis 就是對各種數據結構的建立和處理了)。相信吃透這一篇對於你對網絡編程的理解會有極大的幫助!

還等什麼,快把這篇文章也分享給你身邊和你一樣愛好深度技術的好友吧!

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()