C++實(shí)現(xiàn)一個(gè)簡(jiǎn)易線程池的使用小結(jié)
在現(xiàn)代軟件開發(fā)中,多線程編程已經(jīng)成為提升程序性能的常見手段。無論是處理大量 I/O 請(qǐng)求的服務(wù)器,還是進(jìn)行 CPU 密集型計(jì)算的應(yīng)用,多線程都能顯著提高吞吐量和響應(yīng)速度。然而,直接頻繁創(chuàng)建和銷毀線程開銷巨大,也容易導(dǎo)致資源浪費(fèi)和管理復(fù)雜度的增加。
為了解決這個(gè)問題,線程池應(yīng)運(yùn)而生。它的核心思想是:提前創(chuàng)建固定數(shù)量的工作線程,并將任務(wù)提交到一個(gè)任務(wù)隊(duì)列中,由線程池中的線程循環(huán)取任務(wù)執(zhí)行。這樣,不僅減少了線程創(chuàng)建和銷毀的開銷,也能方便地控制系統(tǒng)的并發(fā)度。
這篇博客記錄了逐步實(shí)現(xiàn)一個(gè)簡(jiǎn)易版線程池的過程。
首先線程池的思想是創(chuàng)建預(yù)先創(chuàng)建一定數(shù)量的工作線程,它們循環(huán)等待任務(wù)隊(duì)列中的任務(wù)并執(zhí)行,從而避免頻繁創(chuàng)建和銷毀線程,實(shí)現(xiàn)資源復(fù)用,提高資源利用率和程序性能。所以我們首先需要一個(gè)vector<thread>的數(shù)組,還有一個(gè)任務(wù)隊(duì)列。同時(shí),由于線程池可能會(huì)被多個(gè)線程同時(shí)訪問:工作線程需要取出任務(wù)隊(duì)列中的任務(wù),其他使用線程池的線程可能同時(shí)提交任務(wù),所以還需要加一把互斥鎖。上述還提到了其他線程會(huì)往線程池中提交任務(wù),使得線程池中的工作線程可以取出任務(wù)隊(duì)列中的任務(wù)并且執(zhí)行,所以還需要一個(gè)公開的提交任務(wù)函數(shù)。基本架構(gòu)如下:
class ThreadPool{
public:
ThreadPool(size_t);
template<class F, class ...Args>
auto enqueue(F&& f, Args&&... args) -> void;
~ThreadPool();
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
};
這里解釋一下任務(wù)隊(duì)列為什么使用std::queue<std::function<void()>> tasks_,首先我們肯定需要一個(gè)統(tǒng)一的形式封裝任務(wù)隊(duì)列,這樣就可以統(tǒng)一封裝到stl容器中。而且我們期望工作線程執(zhí)行時(shí)是可以直接拿來調(diào)用的,工作線程是線程池內(nèi)部實(shí)現(xiàn),不應(yīng)該依賴于用戶傳入任務(wù)的參數(shù)這層外部抽象,所以參數(shù)列表為空。參數(shù)列表為空之后用戶傳入帶有參數(shù)的函數(shù)還是可以執(zhí)行的,注意我們的enqueue函數(shù)中接受了用戶提供的任務(wù)函數(shù)和參數(shù),在這個(gè)函數(shù)中會(huì)將二者進(jìn)行std::bind,這樣就把參數(shù)填入可調(diào)用對(duì)象了,所以工作線程取出任務(wù)時(shí)可以直接調(diào)用。至于用戶需要返回值怎么辦,這里先埋一個(gè)伏筆,先統(tǒng)一使用void表示無返回值,便于封裝。
接下來我們實(shí)現(xiàn)構(gòu)造函數(shù):
ThreadPool::ThreadPool(size_t nums){
for(size_t i = 0; i < nums; i++){
workers_.emplace_back([this]{
while(true){
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->mutex_);
this->condition.wait(lock, [this]{!this->tasks_.empty(); });
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
});
}
}
構(gòu)造函數(shù)的目的就是向std::vector<std::thread> workers_中填充工作線程,工作線程的邏輯如下:首先嘗試獲取鎖,獲取鎖之后查看任務(wù)隊(duì)列是否為空,如果不為空的話就取出元素執(zhí)行,如果為空的話就釋放鎖進(jìn)行下一輪循環(huán)。
下面我們從細(xì)節(jié)上具體說明,獲取鎖之后使用條件變量進(jìn)行等待,wait(lock, [this]{!this->tasks_.empty(); }),這樣當(dāng)任務(wù)隊(duì)列不為空時(shí)就可以不阻塞往下執(zhí)行,如果為空的話,就會(huì)釋放鎖,并且將該線程添加到對(duì)應(yīng)條件變量的阻塞隊(duì)列中,之后如果enqueue時(shí)填充了任務(wù)就可以發(fā)signal喚醒阻塞于這個(gè)條件變量的一個(gè)線程。如果獲取鎖之后發(fā)現(xiàn)任務(wù)隊(duì)列中有任務(wù)可以執(zhí)行,那么就取出任務(wù)隊(duì)列中的第一個(gè)任務(wù)執(zhí)行,因?yàn)橐WC調(diào)用順序。
注意在emplace_back函數(shù)中直接調(diào)用了線程的構(gòu)造函數(shù),先在vector 尾部直接原地構(gòu)造一個(gè)線程對(duì)象。構(gòu)造線程對(duì)象時(shí)會(huì)立即創(chuàng)建一個(gè)新線程,在這個(gè)線程里執(zhí)行傳入的 lambda。所以在執(zhí)行條件變量的wait時(shí)就已經(jīng)有實(shí)際創(chuàng)建的線程對(duì)象了。
我們?cè)跇?gòu)造函數(shù)中新使用了condition這個(gè)條件變量,所以應(yīng)該在ThreadPool類的私有成員中添加對(duì)應(yīng)變量。上述構(gòu)造函數(shù)構(gòu)造的線程函數(shù)是一個(gè)死循環(huán),因此會(huì)一直等待,獲取任務(wù)隊(duì)列中的任務(wù)然后執(zhí)行。我們還要給線程函數(shù)加上停止邏輯,不然主線程調(diào)用join的時(shí)候就會(huì)一直阻塞,因?yàn)榇藭r(shí)線程函數(shù)中還沒有返回邏輯,所以需要給線程函數(shù)添加返回的分支。
線程函數(shù)返回的時(shí)任務(wù)隊(duì)列應(yīng)該為空,因?yàn)閺倪壿嬌现v我要把用戶給我的任務(wù)全部完成,所以任務(wù)隊(duì)列為空是檢測(cè)是否可以返回的條件。如果只檢測(cè)這個(gè)條件也是不可以的,因?yàn)榭赡芫€程池中其他線程已經(jīng)把任務(wù)隊(duì)列中的所有任務(wù)取出來執(zhí)行完,此時(shí)任務(wù)隊(duì)列也為空,但是用戶還可能enqueue任務(wù)進(jìn)去,所以此時(shí)線程池不應(yīng)該關(guān)閉。此時(shí)我們需要一個(gè)標(biāo)志位,在ThreadPool的析構(gòu)函數(shù)中將這個(gè)標(biāo)志位設(shè)置為true,此時(shí)用戶也不需要使用線程池了,因?yàn)檎{(diào)用析構(gòu)函數(shù)時(shí)肯定已經(jīng)退出用戶代碼作用域了,所以應(yīng)該使用這個(gè)標(biāo)志位和任務(wù)隊(duì)列是否為空檢測(cè)。下面給出完善后代碼:
class ThreadPool{
public:
ThreadPool(size_t){
stoped_ = false;
for(size_t i = 0; i < nums; i++){
workers_.emplace_back([this]{
while(true){
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->mutex_);
this->condition.wait(lock, [this]{!this->tasks_.empty(); });
if(this->stopd_ && this->tasks_.empty()){
return;
}
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
});
}
}
template<class F, class ...Args>
auto enqueue(F&& f, Args&&... args) -> void;
~ThreadPool();
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::condition_variable condition;
std::mutex mutex_;
bool stoped_;
};
接下來是析構(gòu)函數(shù),析構(gòu)函數(shù)應(yīng)該是否線程池相關(guān)資源,循環(huán)調(diào)用join回收所有線程。
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mutex_);
stoped_ = true;
}
condition.notify_all();
for(std::thread &worker: workers_)
worker.join();
}
這里我們將stoped_設(shè)置為true之后會(huì)喚醒所有在該條件變量上阻塞的工作線程,但是條件變量的喚醒只有一個(gè)線程可以重新獲取鎖,這個(gè)獲取鎖的線程就可以繼續(xù)執(zhí)行線程函數(shù)的邏輯,判斷隊(duì)列是不是為空,為空的話就可以直接return了,因?yàn)榇藭r(shí)任務(wù)隊(duì)列不會(huì)再有任務(wù)進(jìn)來,但是也只有這個(gè)線程會(huì)return,其他的線程沒有獲取到鎖,繼續(xù)阻塞在條件變量的阻塞隊(duì)列上。因此我們要修改條件變量的wait條件,目標(biāo)是調(diào)用析構(gòu)之后所有的線程都可以感知到并且被喚醒,盡管一次只有一個(gè)線程可以獲取鎖。
將條件變量的wait邏輯修改為如下就好了:
this->condition.wait(lock, [this]{this->stoped_ || !this->tasks_.empty(); });
最后我們來實(shí)現(xiàn)enqueue函數(shù),這個(gè)函數(shù)的作用是添加新任務(wù)到任務(wù)隊(duì)列,下面給出初版實(shí)現(xiàn):
template<class F, class ...Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> void{
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
{
std::unique_lock<std::mutex> lock(mutex_);
if(stoped_){
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([task](){task()});
}
condition.notify_one();
}
這段代碼就是將用戶傳入的可調(diào)用對(duì)象和參數(shù)綁定,這樣工作線程取出來之后就可以直接調(diào)用,不需要再傳參。再將這個(gè)任務(wù)放入任務(wù)隊(duì)列,通知一個(gè)工作線程可以取出任務(wù)執(zhí)行。
上述代碼其實(shí)很粗糙,我們來逐步優(yōu)化。
tasks_.emplace([task](){task()});
這段將任務(wù)放入任務(wù)隊(duì)列的代碼,調(diào)用了拷貝構(gòu)造函數(shù),如果對(duì)象很大的話會(huì)影響性能,比如說一個(gè)可調(diào)用對(duì)象內(nèi)部封裝了大量變量,和上下文信息。所以我們應(yīng)該使用移動(dòng)或者指針的方式,這里不可以使用移動(dòng),因?yàn)閰?shù)F是通過引用傳入的,移動(dòng)意味著放棄所有權(quán),假如用戶聲明了一個(gè)F,傳參之后在后續(xù)用戶代碼想調(diào)用這個(gè)F時(shí)就會(huì)發(fā)生報(bào)錯(cuò),因?yàn)榇藭r(shí)F已經(jīng)被移動(dòng)了,用戶代碼失去了所有權(quán)。這里只可以使用指針來做,而且應(yīng)該使用共享指針,這樣可以確保用戶代碼擁有所有權(quán)。
template<class F, class ...Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> void{
using TaskType = decltype(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto taskPtr = std::make_shared<TaskType>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
{
std::unique_lock<std::mutex> lock(mutex_);
if(stoped_){
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([taskPtr]() {
(*taskPtr)();
});
}
condition.notify_one();
}
注意我們使用智能指針時(shí)需要知道對(duì)象具體的類型,但是std::bind是一個(gè)函數(shù)模板,它會(huì)返回一個(gè)可調(diào)用對(duì)象,這里返回的可調(diào)用對(duì)象是一個(gè)未命名類型的對(duì)象,你不能寫出它的確切類型名字(它不是 std::function),但它可以像函數(shù)一樣調(diào)用。也就是說返回一個(gè)具有operator()的匿名類型。
因?yàn)檫@個(gè)匿名類型我們沒法直接寫出來,所以需要讓編譯器幫我們推導(dǎo)。
decltype(std::bind(std::forward<F>(f), std::forward<Args>(args)...))
上述enqueue函數(shù)還有一個(gè)問題,就是它沒有返回值,用戶無法獲得任務(wù)結(jié)果,也無法知道任務(wù)是否執(zhí)行完成。因此我們需要一種機(jī)制,當(dāng)用戶調(diào)用enqueue函數(shù)時(shí)可以拿到一個(gè)句柄,當(dāng)任務(wù)被工作線程執(zhí)行完成后,用戶檢測(cè)這個(gè)句柄就會(huì)發(fā)現(xiàn)任務(wù)已經(jīng)執(zhí)行完成,然后獲取返回值。這個(gè)機(jī)制其實(shí)就是future和promise,下面我們來繼續(xù)完善:
template<class F, class ...Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>{
using return_type = typename std::invoke_result_t<F, Args...>;
auto taskPtr = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = taskPtr->get_future();
{
std::unique_lock<std::mutex> lock(mutex_);
if(stoped_){
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([taskPtr]() {
(*taskPtr)();
});
}
condition.notify_one();
return res;
}
其中using return_type = std::invoke_result_t<F, Args...>;是推導(dǎo)調(diào)用 f(args...) 的返回類型,std::packaged_task<return_type()>用于把一個(gè)可調(diào)用對(duì)象封裝成一個(gè)可以產(chǎn)生 std::future 的任務(wù)。其中return_type()表示一個(gè)無參數(shù),返回類型為 return_type的任務(wù),正好對(duì)應(yīng)std::bind后的類型。
之后我們就可以調(diào)用taskPtr->get_future():取得與該 packaged_task 關(guān)聯(lián)的 std::future,將這個(gè)future返回出去之后,調(diào)用者可以通過res.get()獲取返回值,類似下面的調(diào)用:
auto future = pool.enqueue([](int x){ return x * 2; }, 21);
int result = future.get(); // result == 42
至此,我們的簡(jiǎn)易線程池就完成了,完整代碼可參考這個(gè)倉庫,下面給出用戶使用線程池的示例代碼:
int main()
{
ThreadPool pool(4);
std::vector< std::future<int> > results;
for(int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}
for(auto && result: results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
到此這篇關(guān)于C++實(shí)現(xiàn)一個(gè)簡(jiǎn)易線程池的使用小結(jié)的文章就介紹到這了,更多相關(guān)C++ 線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- C++線程池的簡(jiǎn)單實(shí)現(xiàn)方法
- c++版線程池和任務(wù)池示例
- c++實(shí)現(xiàn)簡(jiǎn)單的線程池
- c++實(shí)現(xiàn)簡(jiǎn)單的線程池
- c++線程池實(shí)現(xiàn)方法
- C++11 簡(jiǎn)單實(shí)現(xiàn)線程池的方法
- 基于C++11實(shí)現(xiàn)手寫線程池的示例代碼
- 基于C++17實(shí)現(xiàn)的手寫線程池
- 一種類似JAVA線程池的C++線程池實(shí)現(xiàn)方法
- C++實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池的示例代碼
- 使用C++實(shí)現(xiàn)一個(gè)高效的線程池
相關(guān)文章
C++編程中將引用類型作為函數(shù)參數(shù)的方法指南
這篇文章主要介紹了C++編程中將引用類型作為函數(shù)參數(shù)的方法指南,是C++入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-09-09
使用C# 判斷給定大數(shù)是否為質(zhì)數(shù)的詳解
本篇文章是對(duì)使用C#判斷給定大數(shù)是否為質(zhì)數(shù)的方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05
Qt使用QSqlDatabase連接MySQL實(shí)現(xiàn)增刪改查功能
這篇文章主要為大家詳細(xì)介紹了Qt如何使用QSqlDatabase連接MySQL實(shí)現(xiàn)增刪改查功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-07-07
C語言課程設(shè)計(jì)之停車場(chǎng)管理問題
這篇文章主要為大家詳細(xì)介紹了C語言課程設(shè)計(jì)之停車場(chǎng)管理問題,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03

