GoLang協(xié)程庫libtask學習筆記
協(xié)程解決了什么問題
我們先從一次網(wǎng)絡IO請求過程中的read操作為例,請求數(shù)據(jù)會先拷貝到系統(tǒng)內(nèi)核空間中,再從操作系統(tǒng)的內(nèi)核空間拷貝到應用程序的用戶空間中。從內(nèi)核空間將數(shù)據(jù)拷貝到用戶空間過程中,會經(jīng)歷兩個階段:
- 等待數(shù)據(jù)準備
- 拷貝數(shù)據(jù)
因為有這兩個階段,所以就有了各種網(wǎng)絡IO的模型:
同步編程:應用程序等待IO結(jié)果(比如等待打開一個大的文件,或者等待遠端服務器的響應),阻塞當前線程。
- 優(yōu)點:邏輯簡單。
- 缺點:效率太低,其他與IO無關(guān)的業(yè)務也要等待IO的響應。
異步多線程/進程:將IO操作頻繁的邏輯、或者單純的IO操作獨立到一/多個線程中,業(yè)務線程與IO線程間靠通信/全局變量來共享數(shù)據(jù)。
- 優(yōu)點:充分利用CPU資源,防止阻塞資源。
- 缺點:線程切換代價相對較高,異步邏輯代碼復雜。
異步消息+回調(diào)函數(shù):設(shè)計一個消息循環(huán)處理器,接收外部消息(包括系統(tǒng)通知和網(wǎng)絡報文等),收到消息時調(diào)用注冊的回調(diào)函數(shù)。
- 優(yōu)點:充分利用CPU資源,防止阻塞資源。
- 缺點:代碼邏輯復雜。
而協(xié)程,就是用同步的語義去解決異步問題,即業(yè)務邏輯看起來是同步的,但實際上并不阻塞當前線程(一般是靠事件循環(huán)處理來分發(fā)消息)。所以協(xié)程實際上是在單線程的環(huán)境下實現(xiàn)的應用程序級別的并發(fā),就是把本來由操作系統(tǒng)控制的切換+保存狀態(tài)在應用程序里面實現(xiàn)了。
由于協(xié)程在應用程序級別來處理任務,所以協(xié)程更像是一個函數(shù),只是比普通的函數(shù)多了兩個動作:yield()
和resume()
,即讓出和恢復。讓出的時候我們需要將寄存器的協(xié)程上下文保存起來,恢復的時候再將上下文重新壓入寄存器,繼續(xù)執(zhí)行。
簡介
Libtask 是一個簡單的協(xié)程庫,它展示了最簡單的一種協(xié)程實現(xiàn)方式。操作系統(tǒng)只能看見一個內(nèi)核線程,無法感知到客戶端協(xié)程的存在。
Libtask中的協(xié)程是協(xié)作式的,也就是說,使用的不是時間片輪轉(zhuǎn)算法,調(diào)度器根據(jù)先來先服務的策略來執(zhí)行就緒隊列中的協(xié)程,只有當每個協(xié)程主動退出時調(diào)度器才會把CPU分配給下一個協(xié)程。
對協(xié)程的抽象
在libtask中,協(xié)程被抽象成一個Task結(jié)構(gòu)體,結(jié)構(gòu)體中的字段用于描述協(xié)程的相關(guān)信息:
// 一個Task可以看成是一個需要異步執(zhí)行的任務,coroutine的抽象描述 struct Task { char name[256]; char state[256]; // 前后指針 Task *next; Task *prev; Task *allnext; Task *allprev; // 執(zhí)行上下文 Context context; // 睡眠時間 uvlong alarmtime; uint id; // 協(xié)程棧指針 uchar *stk; // 協(xié)程棧大小 uint stksize; // 協(xié)程是否退出了 int exiting; // 在在alltask的中的索引下標 int alltaskslot; // 是否是系統(tǒng)協(xié)程 int system; // 是否在就緒狀態(tài) int ready; // Task需要執(zhí)行的函數(shù) void (*startfn)(void*); // startfn的參數(shù) void *startarg; // 自定義數(shù)據(jù) void *udata; };
創(chuàng)建協(xié)程
int taskcreate(void (*fn)(void*), void *arg, uint stack) { int id; Task *t; // 分配task和stack的空間 t = taskalloc(fn, arg, stack); // 協(xié)程的數(shù)量+1 taskcount++; id = t->id; if(nalltask%64 == 0){ alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0])); if(alltask == nil){ fprint(2, "out of memory\n"); abort(); } } // 記錄位置 t->alltaskslot = nalltask; // 保存到alltask中 alltask[nalltask++] = t; // 修改狀態(tài)為就緒,可以被調(diào)度,并且加入到就緒隊列 taskready(t); return id; }
我們可以使用taskcreate
函數(shù)來創(chuàng)建協(xié)程,在taskcreate
函數(shù)中,首先會調(diào)用taskalloc
函數(shù)為Task和執(zhí)行棧分配內(nèi)存,然后初始化協(xié)程的上下文信息,在此之后,一個協(xié)程就被創(chuàng)建成功了。
/* * taskalloc函數(shù)的主要邏輯是申請Task結(jié)構(gòu)體所需的內(nèi)存和執(zhí)行時棧的內(nèi)存, * 然后初始化各個字段。在此之后,一個協(xié)程就被創(chuàng)建成功了,接著執(zhí)行taskready * 把協(xié)程加入就緒隊列中。 * */ static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) { Task *t; sigset_t zero; uint x, y; ulong z; /* allocate the task and stack together */ // 結(jié)構(gòu)體本身的大小和棧大小 // 協(xié)程棧大小是256*1024 t = malloc(sizeof *t+stack); if(t == nil){ fprint(2, "taskalloc malloc: %r\n"); abort(); } memset(t, 0, sizeof *t); // 棧的內(nèi)存位置 t->stk = (uchar*)(t+1); // 棧大小 t->stksize = stack; // 協(xié)程id t->id = ++taskidgen; // 協(xié)程工作函數(shù)和參數(shù) t->startfn = fn; t->startarg = arg; /* do a reasonable initialization */ memset(&t->context.uc, 0, sizeof t->context.uc); sigemptyset(&zero); // 初始化uc_sigmask字段為空,即不阻塞信號 sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); /* must initialize with current context */ // 初始化uc字段 if(getcontext(&t->context.uc) < 0){ fprint(2, "getcontext: %r\n"); abort(); } /* call makecontext to do the real work. */ /* leave a few words open on both ends */ // 設(shè)置協(xié)程執(zhí)行時的棧位置和大小 t->context.uc.uc_stack.ss_sp = t->stk+8; t->context.uc.uc_stack.ss_size = t->stksize-64; #if defined(__sun__) && !defined(__MAKECONTEXT_V2_SOURCE) /* sigh */ #warning "doing sun thing" /* can avoid this with __MAKECONTEXT_V2_SOURCE but only on SunOS 5.9 */ t->context.uc.uc_stack.ss_sp = (char*)t->context.uc.uc_stack.ss_sp +t->context.uc.uc_stack.ss_size; #endif /* * All this magic is because you have to pass makecontext a * function that takes some number of word-sized variables, * and on 64-bit machines pointers are bigger than words. */ //print("make %p\n", t); z = (ulong)t; y = z; z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */ x = z>>16; // 保存信息到uc字段 makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x); return t; }
創(chuàng)建好一個協(xié)程之后,taskcreate
函數(shù)會調(diào)用taskready()
函數(shù)把協(xié)程的狀態(tài)修改為就緒態(tài),并加入到就緒隊列中。
/* * 修改協(xié)程的狀態(tài)并加入就緒隊列 * */ void taskready(Task *t) { t->ready = 1; addtask(&taskrunqueue, t); }
如何保存上下文信息
我們可以發(fā)現(xiàn),在調(diào)用taskalloc
函數(shù)初始化協(xié)程的時候,我們還會對協(xié)程的上下文進行初始化,以下代碼的流程是將當前CPU寄存器的上下文信息保存到當前Task的上下文中,同時將當前Task的棧位置和大小保存進上下文中,最后將協(xié)程的工作函數(shù)保存到上下文信息中。
// 將上下文置為零值 memset(&t->context.uc, 0, sizeof t->context.uc); // 將信號集zero清空 sigemptyset(&zero); // 初始化uc_sigmask字段為空,即不阻塞信號 sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); // 將當前上下文信息保存到t-context.uc結(jié)構(gòu)體中 if(getcontext(&t->context.uc) < 0){ fprint(2, "getcontext: %r\n"); abort(); } // 設(shè)置協(xié)程執(zhí)行時的棧位置和大小 t->context.uc.uc_stack.ss_sp = t->stk+8; t->context.uc.uc_stack.ss_size = t->stksize-64; z = (ulong)t; y = z; z >>= 16; x = z>>16; // 設(shè)置協(xié)程的工作函數(shù)到上下文信息中 makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
ucontext族函數(shù)
其實對協(xié)程上下文的初始化以及保存是通過linux下的ucontext族函數(shù)來實現(xiàn)的。
ucontext_t結(jié)構(gòu)體
我們發(fā)現(xiàn)Task結(jié)構(gòu)體中有一個Context字段,這個字段其實就是對ucontext_t
結(jié)構(gòu)體的封裝,ucontext_t
結(jié)構(gòu)體用于保存當前的上下文信息,它的結(jié)構(gòu)是這樣的:
typedef struct ucontext { unsigned long int uc_flags; struct ucontext *uc_link;//后序上下文 __sigset_t uc_sigmask;// 信號屏蔽字掩碼 stack_t uc_stack;// 上下文所使用的棧 // 保存的上下文的寄存器信息 // 比如pc、sp、bp // pc程序計數(shù)器:記錄下一條指令的地址 // sp堆棧指針:指向函數(shù)調(diào)用棧棧頂?shù)闹羔?,所以新?shù)據(jù)入棧將存入sp+1的地址 // bp基址指針:指向函數(shù)調(diào)用棧的首地址 mcontext_t uc_mcontext; long int uc_filler[5]; } ucontext_t; //其中mcontext_t 定義如下 typedef struct { gregset_t __ctx(gregs);//所裝載寄存器 fpregset_t __ctx(fpregs);//寄存器的類型 } mcontext_t; //其中g(shù)regset_t 定義如下 typedef greg_t gregset_t[NGREG];//包括了所有的寄存器的信息
getcontext()函數(shù)
函數(shù)原型:
int getcontext(ucontext_t* ucp)
getcontext()
函數(shù)的底層是通過匯編來實現(xiàn)的,其主要的功能是將當前運行到的寄存器信息保存到參數(shù)ucp中。
setcontext()函數(shù)
函數(shù)原型:
int setcontext(const ucontext_t *ucp)
setcontext()
函數(shù)的作用是將ucontext_t
結(jié)構(gòu)體變量ucp中的上下文信息重新恢復到cpu中并執(zhí)行。
makecontext()函數(shù)
函數(shù)原型:
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
makecontext()
函數(shù)的主要功能是設(shè)置協(xié)程的工作函數(shù)到上下文(ucontext_t)中,同時在用戶設(shè)置的棧上保存一些信息,并且設(shè)置棧頂指針的值到上下文信息中。
argc是入口函數(shù)的參數(shù)個數(shù),后面的...
是具體的入口函數(shù)參數(shù),該參數(shù)必須是整形值。
swapcontext()函數(shù)
函數(shù)原型:
int swapcontext(ucontext_t *oucp, ucontext_t *ucp)
該函數(shù)可以將當前cpu中的上下文信息保存到oucp
結(jié)構(gòu)體變量中,然后將ucp
結(jié)構(gòu)體的上下文信息恢復到cpu中。
這里可以理解為調(diào)用了兩個函數(shù),第一次是調(diào)用了getcontext(oucp)
然后再調(diào)用setcontext(ucp)
。
協(xié)程的調(diào)度
在使用taskcreate
創(chuàng)建協(xié)程的時候,這個函數(shù)內(nèi)部會調(diào)用taskready
函數(shù)修改新建協(xié)程的狀態(tài)并加入就緒隊列taskrunqueue
中,taskrunqueue
中的協(xié)程需要一個調(diào)度器來調(diào)度執(zhí)行。
tasklib庫中實現(xiàn)了一個協(xié)程調(diào)度中心的函數(shù)。調(diào)度中心會不斷的從就緒隊列中取出協(xié)程來執(zhí)行,它的核心邏輯是這樣的:
- 從就緒隊列中拿出一個協(xié)程t,并把t移出就緒隊列。
- 通過
contextswitch
函數(shù)將協(xié)程t的上下文信息切換到taskschedcontext
中執(zhí)行。 - 將協(xié)程t切換回調(diào)度中心,如果t已經(jīng)退出,修改數(shù)據(jù)結(jié)構(gòu),然后回收他的內(nèi)存,然后繼續(xù)調(diào)度其它的協(xié)程執(zhí)行。這里的調(diào)度機制比較簡單,是非搶占式的協(xié)作式調(diào)度,沒有時間片的概念,一個協(xié)程的執(zhí)行時間由自己決定,放棄執(zhí)行的權(quán)力也是自己控制的,當協(xié)程不想執(zhí)行了可以調(diào)用
taskyield()
函數(shù)讓出cpu。
static void taskscheduler(void) { int i; Task *t; taskdebug("scheduler enter"); for(;;){ // 如果沒有就緒態(tài)協(xié)程了,就退出 if(taskcount == 0) exit(taskexitval); // 從就緒隊列中拿出一個協(xié)程 t = taskrunqueue.head; if(t == nil){ fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount); exit(1); } // 從就緒隊列中刪除這個協(xié)程 deltask(&taskrunqueue, t); // 將協(xié)程狀態(tài)改為非就緒態(tài) t->ready = 0; // 保存正在執(zhí)行的協(xié)程 taskrunning = t; // 切換次數(shù)+1 tasknswitch++; taskdebug("run %d (%s)", t->id, t->name); // 切換到t執(zhí)行,將當前cpu中的上下文信息保存到taskschedcontext中 // 然后將t->context中的上下文信息恢復到cpu中執(zhí)行 contextswitch(&taskschedcontext, &t->context); // 執(zhí)行結(jié)束 taskrunning = nil; // 剛才執(zhí)行的協(xié)程t退出了 if(t->exiting){ // 如果不是系統(tǒng)協(xié)程,協(xié)程個數(shù)減一 if(!t->system) taskcount--; // 保存當前協(xié)程在alltask的索引 i = t->alltaskslot; // 將最后一個協(xié)程切換到當前協(xié)程的位置,因為當前協(xié)程要退出了 alltask[i] = alltask[--nalltask]; // 更新被置換協(xié)程的索引 alltask[i]->alltaskslot = i; // 釋放堆內(nèi)存 free(t); } } }
從上述調(diào)度器執(zhí)行的代碼中可以發(fā)現(xiàn),我們使用contextswitch
函數(shù)實現(xiàn)了協(xié)程間的上下文切換,這個函數(shù)的內(nèi)部調(diào)用了swapcontext(&from->uc, &to->uc)
函數(shù),這個函數(shù)將當前cpu中的上下文信息保存到from結(jié)構(gòu)體變量中,然后將to結(jié)構(gòu)體的上下文信息恢復到cpu中執(zhí)行。
執(zhí)行結(jié)束之后,會重新將上下文切換回調(diào)度中心。
static void contextswitch(Context *from, Context *to) { if(swapcontext(&from->uc, &to->uc) < 0){ fprint(2, "swapcontext failed: %r\n"); assert(0); } }
其實我們還可以通過調(diào)用taskyield
函數(shù)來控制協(xié)程在沒有執(zhí)行完就主動讓出,那么當前正在執(zhí)行的task會被 插入就緒隊列的尾部,等待后續(xù)的調(diào)度,然后調(diào)度器會從就緒隊列的頭部重新取出一個task來執(zhí)行。
/* * 協(xié)程主動讓出CPU * 1.將主動讓出的協(xié)程重新加入到就緒隊列 * 2.將當前協(xié)程的狀態(tài)標記為讓出 * 3.執(zhí)行協(xié)程切換的邏輯 * */ int taskyield(void) { int n; // 協(xié)程的讓出次數(shù) n = tasknswitch; // 將當前主動讓出的協(xié)程放進等待隊列 taskready(taskrunning); // 標記當前協(xié)程的狀態(tài)為“讓出” taskstate("yield"); // 切換協(xié)程 taskswitch(); // 等于0說明當前只有自己一個協(xié)程,調(diào)度的時候taskswitch加1,所以這里要減1 return tasknswitch - n - 1; }
我們可以發(fā)現(xiàn),切換流程的時候?qū)嶋H上是調(diào)用了taskswitch()
函數(shù),這個函數(shù)內(nèi)部會調(diào)用contextswitch
函數(shù)來切換上下文。
/* * 切換協(xié)程 * */ void taskswitch(void) { needstack(0); // 將當前CPU中的上下文信息保存到taskrunning->context結(jié)構(gòu)體中 // 然后將調(diào)度中心上下文恢復到CPU中執(zhí)行 contextswitch(&taskrunning->context, &taskschedcontext); }
總結(jié)
所以整個調(diào)度流程是這樣的:
每一個協(xié)程對應一個Task結(jié)構(gòu)體。然后調(diào)度中心不斷地按照先進先出的方式去調(diào)度協(xié)程的執(zhí)行就可以。因為沒有搶占機制,所以調(diào)度中心是依賴協(xié)程本身去驅(qū)動的,協(xié)程需要主動讓出cpu,把上下文切換回調(diào)度中心,調(diào)度中心才能進行下一輪的調(diào)度。
當然我們也可以調(diào)用taskyield
函數(shù)主動讓出CPU,他會將當前正在執(zhí)行的task插入就緒隊列的尾部,等待后續(xù)的調(diào)度,然后調(diào)度器會從就緒隊列的頭部重新取出一個task來執(zhí)行。
到此這篇關(guān)于GoLang協(xié)程庫libtask學習筆記的文章就介紹到這了,更多相關(guān)GoLang libtask內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言reflect.Type?和?reflect.Value?應用示例詳解
這篇文章主要為大家介紹了go語言reflect.Type?和?reflect.Value?應用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-09-09Go語言內(nèi)建函數(shù)cap的實現(xiàn)示例
cap 是一個常用的內(nèi)建函數(shù),它用于獲取某些數(shù)據(jù)結(jié)構(gòu)的容量,本文主要介紹了Go語言內(nèi)建函數(shù)cap的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下2024-08-08