C++高性能服務器框架之協(xié)程調(diào)度模塊
協(xié)程調(diào)度模塊概述
封裝了一個N : M協(xié)程調(diào)度器,創(chuàng)建M個協(xié)程在N個線程上運行。通過schedule()方法將cb或fiber重新加到任務隊列中執(zhí)行任務,協(xié)程可以在線程上自由切換,也可以在指定線程上執(zhí)行。
1 —— N 1 —— M
scheduler ---> thread ---> fiber
?
N : M 協(xié)程可以在線程間自由切換
1. 線程池, 分配一組線程
2. 協(xié)程調(diào)度器,將協(xié)程指定到相應線程上執(zhí)行
a)隨機選擇空閑的協(xié)程執(zhí)行
b)協(xié)程指定必須在某個線程上執(zhí)行
協(xié)程調(diào)度器調(diào)度主要思想為:先查看任務隊列中有沒有任務需要執(zhí)行,若沒有任務需要執(zhí)行則執(zhí)行idel(),其思想主要在run()中體現(xiàn)。
其次,在設計協(xié)程調(diào)度器時,設置了一個use_caller來決定是否將當前調(diào)度線程也納入調(diào)度中,這樣可以少創(chuàng)建一個線程執(zhí)行任務,效率更高。
詳解
class Scheduler
兩個局部線程變量保存當前線程的協(xié)程調(diào)度器和主協(xié)程
// 當前協(xié)程調(diào)度器 static thread_local Scheduler* t_secheduler = nullptr; // 線程主協(xié)程 static thread_local Fiber* t_fiber = nullptr;
FiberAndThread(任務結(jié)構(gòu)體)
通過FiberAndThread結(jié)構(gòu)體在存儲協(xié)程,回調(diào)函數(shù),線程的信息
struct FiberAndThread {
? ? ? ?// 協(xié)程
? ? ? ?Fiber::ptr fiber;
? ? ? ?// 協(xié)程執(zhí)行函數(shù)
? ? ? ?std::function<void()> cb;
? ? ? ?// 線程id 協(xié)程在哪個線程上
? ? ? ?int thread;
? // 確定協(xié)程在哪個線程上跑
? ? ? ?FiberAndThread(Fiber::ptr f, int thr)
? ? ? ? ? :fiber(f), thread(thr) {
? ? ? }
? // 通過swap將傳入的 fiber 置空,使其引用計數(shù)-1
? ? ? ?FiberAndThread(Fiber::ptr* f, int thr)
? ? ? ? ? :thread(thr) {
? ? ? ? ? ?fiber.swap(*f);
? ? ? }
? // 確定回調(diào)在哪個線程上跑
? FiberAndThread(std::function<void()> f, int thr)
? ? ? ? ? :cb(f), thread(thr) {
? ? ? }
? // 通過swap將傳入的 cb 置空,使其引用計數(shù)-1
? FiberAndThread(std::function<void()>* f, int thr)
? ? ? ? ? :thread(thr) {
? ? ? ? ? ?cb.swap(*f);
? ? ? }
?
? ? ? ?// 默認構(gòu)造
? ? ? ?FiberAndThread() {
? ? ? ? ? ?thread = -1;
? ? ? }
? // 重置
? ? ? ?void reset() {
? ? ? ? ? ?fiber = nullptr;
? ? ? ? ? ?cb = nullptr;
? ? ? ? ? ?thread = -1;
? ? ? }
?
? };mumber(成員變量)
private:
? ?// Mutex
? ?MutexType m_mutex;
? ?// 線程池
? ?std::vector<sylar::Thread::ptr> m_threads;
? ?// 待執(zhí)行的協(xié)程隊列
? ?std::list<FiberAndThread> m_fibers;
? ?// use_caller為true時有效,調(diào)度協(xié)程
? ?Fiber::ptr m_rootFiber;
? ?// 協(xié)程調(diào)度器名稱
? ?std::string m_name;
?
protected:
? ?// 協(xié)程下的線程id數(shù)組
? ?std::vector<int> m_threadIds;
? ?// 線程數(shù)量
? ?size_t m_threadCount = 0;
? ?// 工作線程數(shù)量
? ?std::atomic<size_t> m_activateThreadCount = {0};
? ?// 空閑線程數(shù)量
? ?std::atomic<size_t> m_idleThreadCount = {0};
? ?// 是否正在停止
? ?bool m_stopping = true;
? ?// 是否自動停止
? ?bool m_autoStop = false;
? ?// 主線程Id(use_caller)
? ?int m_rootThread = 0;scheduler(調(diào)度協(xié)程)
// 調(diào)度協(xié)程
template<class FiberOrCb>
void schedule(FiberOrCb fc, int thread = -1) {
? ?bool need_tickle = false;
? {
? ? ? ?MutexType::Lock lock(m_mutex);
? ? ? ?// 將任務加入到隊列中,若任務隊列中已經(jīng)有任務了,則tickle()
? ? ? ?need_tickle = scheduleNoLock(fc, thread);
? }
?
? ?if (need_tickle) {
? ? ? ?tickle();
? }
}
// 批量調(diào)度協(xié)程
template<class InputIterator>
void schedule(InputIterator begin, InputIterator end) {
? ?bool need_tickle = false;
? {
? ? ? ?MutexType::Lock lock(m_mutex);
? ? ? ?while (begin != end) {
? ? ? ? ? ?need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
? ? ? ? ? ?++begin;
? ? ? }
? }
? ?if (need_tickle) {
? ? ? ?tickle();
? }
}檢查任務隊列中有無任務,將任務加入到任務隊列中,若任務隊列中本來就已經(jīng)有任務了,就tickle以下
/**
* @brief 協(xié)程調(diào)度啟動(無鎖)
*/
template<class FiberOrCb>
bool scheduleNoLock(FiberOrCb fc, int thread) {
? ?bool need_tickle = m_fibers.empty();
? ?FiberAndThread ft(fc, thread);
? ?if (ft.fiber || ft.cb) {
? ? ? ?m_fibers.push_back(ft);
? }
? ?return need_tickle;
}Scheduler(構(gòu)造函數(shù))
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string &name)
? :m_name(name) {
// 確定線程數(shù)量要正確
? ?SYLAR_ASSERT(threads > 0);
? ?// 是否將協(xié)程調(diào)度線程也納入調(diào)度器
? ?if (use_caller) {
? ? ? ?// 設置線程名稱
? ? ? ?sylar::Thread::SetName(m_name);
? ? ? ?// 獲得主協(xié)程
? ? ? ?sylar::Fiber::GetThis();
? ? ? ?// 線程數(shù)量-1
? ? ? ?--threads;
? ? ? ?SYLAR_ASSERT(GetThis() == nullptr);
? ? ? ?// 設置當前協(xié)程調(diào)度器
? ? ? ?t_secheduler = this;
?
? ? ? ?// 將此fiber設置為 use_caller,協(xié)程則會與 Fiber::CallerMainFunc() 綁定
? ? ? ?// 非靜態(tài)成員函數(shù)需要傳遞this指針作為第一個參數(shù),用 std::bind()進行綁定
? ? ? ?m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, true));
? ? ? ?// 設置當前線程的主協(xié)程為m_rootFiber
? ? ? ?// 這里的m_rootFiber是該線程的主協(xié)程(執(zhí)行run任務的協(xié)程),只有默認構(gòu)造出來的fiber才是主協(xié)程
? ? ? ?t_fiber = m_rootFiber.get();
? ? ? ?// 獲得當前線程id
? ? ? ?m_rootThread = sylar::GetThreadId();
? ? ? ?m_threadIds.push_back(m_rootThread);
? }
? ?// 不將當前線程納入調(diào)度器
? ?else {
? ? ? ?m_rootThread = -1;
? }
? ?m_threadCount = threads;
}~Scheduler(析構(gòu)函數(shù))
Scheduler::~Scheduler() {
? ?// 必須達到停止條件
? ?SYLAR_ASSERT(m_stopping);
? ?if (GetThis() == this) {
? ? ? ?t_secheduler = nullptr;
? }
}start(啟動調(diào)度器)
void Scheduler::start() {
SYLAR_LOG_INFO(g_logger) << "start()";
MutexType::Lock lock(m_mutex);
// 已經(jīng)啟動了
if (!m_stopping) {
return;
}
// 將停止狀態(tài)設置為false
m_stopping = false;
// 線程池為空
SYLAR_ASSERT(m_threads.empty());
// 創(chuàng)建線程池
m_threads.resize(m_threadCount);
for (size_t i = 0; i < m_threadCount; ++i) {
// 線程執(zhí)行 run() 任務
m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this)
, m_name + "_" + std::to_string(i)));
m_threadIds.push_back(m_threads[i]->getId());
}
lock.unlock();
/* 在這里切換線程時,swap的話會將線程的主協(xié)程與當前協(xié)程交換,當使用use_caller時,t_fiber = m_rootFiber,call是將當前協(xié)程與主協(xié)程交換
* 為了確保在啟動之后仍有任務加入任務隊列中,所以在stop()中做該線程的啟動,這樣就不會漏掉任務隊列中的任務
*/
/*
if(m_rootFiber) {
// t_fiber = m_rootFiber.get(), 從自己切換到自己了屬于是
// m_rootFiber->swapIn();
m_rootFiber->call();
}
*/
SYLAR_LOG_INFO(g_logger) << "start() end";
}stop(停止調(diào)度器)
void Scheduler::stop() {
SYLAR_LOG_INFO(g_logger) << "stop()";
// 進入stop將自動停止設為true
m_autoStop = true;
// 使用use_caller,并且只有一個線程,并且主協(xié)程的狀態(tài)為結(jié)束或者初始化
if (m_rootFiber
&& m_threadCount == 0
&& (m_rootFiber->getState() == Fiber::TERM
|| m_rootFiber->getState() == Fiber::INIT)) {
SYLAR_LOG_INFO(g_logger) << this->m_name << " sheduler stopped";
// 停止狀態(tài)為true
m_stopping = true;
// 若達到停止條件則直接return
if (stopping()) {
return;
}
}
// use_caller線程
// 當前調(diào)度器和t_secheduler相同
if (m_rootThread != -1) {
SYLAR_ASSERT(GetThis() == this);
}
// 非use_caller,此時的t_secheduler應該為nullptr
else {
SYLAR_ASSERT(GetThis() != this);
}
// 停止狀態(tài)為true
m_stopping = true;
// 每個線程都tickle一下
for (size_t i = 0; i < m_threadCount; ++i) {
tickle();
}
// 使用use_caller多tickle一下
if (m_rootFiber) {
tickle();
}
// 使用use_caller,只要沒達到停止條件,調(diào)度器主協(xié)程交出執(zhí)行權(quán),執(zhí)行run
if (m_rootFiber) {
if (!stopping()) {
m_rootFiber->call();
}
}
std::vector<Thread::ptr> thrs;
{
MutexType::Lock lock(m_mutex);
thrs.swap(m_threads);
}
// 等待線程執(zhí)行完成
for (auto& i : thrs) {
i->join();
}
}run(協(xié)程調(diào)度函數(shù))
void Scheduler::run() {
SYLAR_LOG_INFO(g_logger) << "run()";
// hook
set_hook_enable(true);
// 設置當前調(diào)度器
setThis();
// 非user_caller線程,設置主協(xié)程為線程主協(xié)程
if (sylar::GetThreadId() != m_rootThread) {
t_fiber = Fiber::GetThis().get();
}
SYLAR_LOG_DEBUG(g_logger) << "new idle_fiber";
// 定義dile_fiber,當任務隊列中的任務執(zhí)行完之后,執(zhí)行idle()
Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));
Fiber::ptr cb_fiber;
FiberAndThread ft;
while (true) {
ft.reset();
bool tickle_me = false;
bool is_active = false;
{ // 從任務隊列中拿fiber和cb
MutexType::Lock lock(m_mutex);
auto it = m_fibers.begin();
while (it != m_fibers.end()) {
// 如果當前任務指定的線程不是當前線程,則跳過,并且tickle一下
if (it->thread != -1 && it->thread != sylar::GetThreadId()) {
++it;
tickle_me = true;
continue;
}
// 確保fiber或cb存在
SYLAR_ASSERT(it->fiber || it->cb);
// 如果該fiber正在執(zhí)行則跳過
if (it->fiber && it->fiber->getState() == Fiber::EXEC) {
++it;
continue;
}
// 取出該任務
ft = *it;
// 從任務隊列中清除
m_fibers.erase(it);
// 正在執(zhí)行任務的線程數(shù)量+1
++m_activateThreadCount;
// 正在執(zhí)行任務
is_active = true;
break;
}
}
// 取到任務tickle一下
if (tickle_me) {
tickle();
}
// 如果任務是fiber,并且任務處于可執(zhí)行狀態(tài)
if (ft.fiber && (ft.fiber->getState() != Fiber::TERM
|| ft.fiber->getState() != Fiber::EXCEPT)) {
// 執(zhí)行任務
ft.fiber->swapIn();
// 執(zhí)行完成,活躍的線程數(shù)量減-1
--m_activateThreadCount;
// 如果線程的狀態(tài)被置為了READY
if (ft.fiber->getState() == Fiber::READY) {
// 將fiber重新加入到任務隊列中
schedule(ft.fiber);
// INIT或HOLD狀態(tài)
} else if (ft.fiber->getState() != Fiber::TERM
&& ft.fiber->getState() != Fiber::EXCEPT) {
// 設置fiber狀態(tài)為HOLD
ft.fiber->setState(Fiber::HOLD);
}
// 執(zhí)行完畢重置數(shù)據(jù)ft
ft.reset();
// 如果任務是cb
} else if(ft.cb) {
// cb_fiber存在,重置該fiber
if (cb_fiber) {
cb_fiber->reset(ft.cb);
// cb_fiber不存在,new新的fiber
} else {
SYLAR_LOG_DEBUG(g_logger) << "new ft.cb";
cb_fiber.reset(new Fiber(ft.cb));
}
// 重置數(shù)據(jù)ft
ft.reset();
// 執(zhí)行cb任務
cb_fiber->swapIn();
// 執(zhí)行完,執(zhí)行任務線程數(shù)量-1
--m_activateThreadCount;
// 若cb_fiber狀態(tài)為READY
if (cb_fiber->getState() == Fiber::READY) {
// 重新放入任務隊列中
schedule(cb_fiber);
// 釋放智能指針
cb_fiber.reset();
// cb_fiber異?;蚪Y(jié)束,就重置狀態(tài),可以再次使用該cb_fiber
} else if (cb_fiber->getState() == Fiber::EXCEPT
|| cb_fiber->getState() == Fiber::TERM) {
// cb_fiber的執(zhí)行任務置空
cb_fiber->reset(nullptr);
} else {
// 設置狀態(tài)為HOLD,此任務后面還會通過ft.fiber被拉起
cb_fiber->setState(Fiber::HOLD);
// 釋放該智能指針,調(diào)用下一個任務時要重新new一個新的cb_fiber
cb_fiber.reset();
}
// 沒有任務執(zhí)行
} else {
// 我感覺這里判斷么啥用
if (is_active) {
--m_activateThreadCount;
continue;
}
// 如果idle_fiber的狀態(tài)為TERM則結(jié)束循環(huán),真正的結(jié)束
if (idle_fiber->getState() == Fiber::TERM) {
SYLAR_LOG_INFO(g_logger) << "idle_fiber term";
break;
}
// 正在執(zhí)行idle的線程數(shù)量+1
++m_idleThreadCount;
// 執(zhí)行idle()
idle_fiber->swapIn();
// 正在執(zhí)行idle的線程數(shù)量-1
--m_idleThreadCount;
// idle_fiber狀態(tài)置為HOLD
if (idle_fiber->getState() != Fiber::TERM
&& idle_fiber->getState() != Fiber::EXCEPT) {
idle_fiber->setState(Fiber::HOLD);
}
}
}
}stopping(判斷停止條件)
bool Scheduler::stopping() {
MutexType::Lock lock(m_mutex);
// 當自動停止 && 正在停止 && 任務隊列為空 && 活躍的線程數(shù)量為0
return m_autoStop && m_stopping
&& m_fibers.empty() && m_activateThreadCount == 0;
}總結(jié)
舉個具體的例子。
#include "../sylar/sylar.h"
static sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();
void test_fiber() {
static int count = 5;
SYLAR_LOG_INFO(g_logger) << "---test in fiber---" << count;
sylar::set_hook_enable(false);
sleep(1);
// 循環(huán)將test_fiber加入到任務隊列中,并且指定第一個拿到該任務的線程一直執(zhí)行
if (--count > 0) {
sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetThreadId());
}
}
int main(int argc, char** argv) {
g_logger->setLevel(sylar::LogLevel::INFO);
sylar::Thread::SetName("main");
SYLAR_LOG_INFO(g_logger) << "main start";
sylar::Scheduler sc(2, false, "work");
sc.start();
SYLAR_LOG_INFO(g_logger) << "schedule";
sc.schedule(&test_fiber);
sc.stop();
SYLAR_LOG_INFO(g_logger) << "main end";
return 0;
}// 設置2個線程, 并且將use_caller設為false, 設置名稱為"work", 指定線程 // 這里可以看到有3個線程 1684 1685 1686 // 1684為調(diào)度線程, 1685和1686為執(zhí)行任務的線程 // 可以看到任務都是在1686線程上執(zhí)行的,因為在shceduler()時指定了任務在第一個拿到該任務的線程上一直執(zhí)行 1684 main 0 [INFO] [root] tests/test_scheduler.cc:20 main start 1684 main 0 [INFO] [root] tests/test_scheduler.cc:23 schedule 1686 work_1 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---5 1686 work_1 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---4 1686 work_1 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---3 1686 work_1 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---2 1686 work_1 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---1 1685 work_0 0 [INFO] [system] sylar/scheduler.cc:237 idle_fiber term 1686 work_1 0 [INFO] [system] sylar/scheduler.cc:237 idle_fiber term 1684 main 0 [INFO] [root] tests/test_scheduler.cc:27 main end
// 設置2個線程, 并且將use_caller設為true, 設置名稱為"work",不指定線程 // 這里可以看到有2個線程 2841 2842 // 2841為調(diào)度線程,他也將自己納入調(diào)度器中執(zhí)行任務 2841 work 0 [INFO] [root] tests/test_scheduler.cc:20 main start 2841 work 0 [INFO] [root] tests/test_scheduler.cc:23 schedule 2842 work_0 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---5 2841 work 6 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---4 2842 work_0 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---3 2841 work 6 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---2 2842 work_0 4 [INFO] [root] tests/test_scheduler.cc:7 ---test in fiber---1 2842 work_0 0 [INFO] [system] sylar/scheduler.cc:237 idle_fiber term 2841 work 1 [INFO] [system] sylar/scheduler.cc:237 idle_fiber term 2841 work 0 [INFO] [root] tests/test_scheduler.cc:27 main end
當我們設置use_caller時,將調(diào)度線程也納入管理中執(zhí)行任務,在構(gòu)造函數(shù)中通過Fiber::GetThis()獲得主協(xié)程,然后new一個子協(xié)程作為該線程的主協(xié)程并與run()綁定,在stop()時使用call()將執(zhí)行權(quán)交給線程主協(xié)程執(zhí)行run()。
在start()時創(chuàng)建其他線程并與run()綁定,此時線程就開始執(zhí)行run()。
在run()中,使用Fiber::GetThis()獲得主協(xié)程并設置為線程主協(xié)程。在協(xié)程切換時,都是執(zhí)行任務協(xié)程與線程主協(xié)程之間的切換。當達到停止條件,idle()執(zhí)行完畢時,run()也執(zhí)行完畢。
此時use_caller線程使用back()將線程主協(xié)程切換到主協(xié)程繼續(xù)執(zhí)行stop()等待其他線程執(zhí)行完畢。
以上就是C++高性能服務器框架之協(xié)程調(diào)度模塊的詳細內(nèi)容,更多關于C++協(xié)程調(diào)度模塊的資料請關注腳本之家其它相關文章!
相關文章
詳解C語言數(shù)據(jù)結(jié)構(gòu)之棧
這篇文章主要為大家介紹了C語言數(shù)據(jù)結(jié)構(gòu)之棧,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-01-01
C++文件關鍵詞快速定位出現(xiàn)的行號實現(xiàn)高效搜索
這篇文章主要為大家介紹了C++文件關鍵詞快速定位出現(xiàn)的行號實現(xiàn)高效搜索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10
C++11 學習筆記之std::function和bind綁定器
這篇文章主要介紹了C++11 學習筆記之std::function和bind綁定器,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-07-07

