淺談c++如何實(shí)現(xiàn)并發(fā)中的Barrier
說(shuō)到Barrier,很多語(yǔ)言中已經(jīng)是標(biāo)準(zhǔn)庫(kù)中自帶的概念,一般情況下,只需要直接使用就行了。而最近一些機(jī)緣巧合的機(jī)會(huì),我需要在c++中使用這么個(gè)玩意兒。但是c++標(biāo)準(zhǔn)庫(kù)里還沒(méi)有這個(gè)概念,只有boost里面有這樣現(xiàn)成的東西,而我又不想為了這么一個(gè)小東西引入個(gè)boost。所以,我借著這個(gè)機(jī)會(huì)研究了下,發(fā)現(xiàn)其實(shí)這些多線程/并發(fā)中的東西還是蠻有意思的。
閱讀本文你可能需要如下的一些知識(shí):
- 多線程編程的概念。
- c++的基本語(yǔ)法和有關(guān)多線程的語(yǔ)法。
第二條可能也沒(méi)有那么重要,因?yàn)槿绻斫饬硕嗑€程的這些東西,什么語(yǔ)言都可以實(shí)現(xiàn)其核心概念。好了,廢話少扯,進(jìn)入正題。
一、什么是Barrier?
首先,得介紹下Barrier的概念,Barrier從字面理解是屏障的意思,主要是用作集合線程,然后再一起往下執(zhí)行。再具體一點(diǎn),在Barrier之前,若干個(gè)thread各自執(zhí)行,然后到了Barrier的時(shí)候停下,等待規(guī)定數(shù)目的所有的其他線程到達(dá)這個(gè)Barrier,之后再一起通過(guò)這個(gè)Barrier各自干自己的事情。
這個(gè)概念特別像小時(shí)候集體活動(dòng)的過(guò)程,大家從各自的家里到學(xué)校集合,待人數(shù)都到齊之后,之后再一起坐車出去,到達(dá)指定地點(diǎn)后一起行動(dòng)或者各自行動(dòng)。
而在計(jì)算機(jī)的世界里,Barrier可以解決的問(wèn)題很多,比如,一個(gè)程序有若干個(gè)線程并發(fā)的從網(wǎng)站上下載一個(gè)大型xml文件,這個(gè)過(guò)程可以相互獨(dú)立,因?yàn)橐粋€(gè)文件的各個(gè)部分并不相關(guān)。而在處理這個(gè)文件的時(shí)候,可能需要一個(gè)完整的文件,所以,需要有一條虛擬的線讓這些并發(fā)的部分集合一下從而可以拼接成為一個(gè)完整的文件,可能是為了后續(xù)處理也可能是為了計(jì)算hash值來(lái)驗(yàn)證文件的完整性。而后,再交由下一步處理。
二、如何實(shí)現(xiàn)一個(gè)Barrier?
并發(fā)的很多東西都擁有一個(gè)壞處就是你很難證明某種實(shí)現(xiàn)不是錯(cuò)誤的,因?yàn)楹芏鄷r(shí)候確實(shí)情況太多了,無(wú)論是死鎖,饑餓對(duì)于人腦都是太大的負(fù)擔(dān)。而反過(guò)來(lái),對(duì)于我扯這篇文章,也是一個(gè)好處,正因?yàn)楹茈y證明不是錯(cuò)誤的,所以我的扯淡可以更放心一點(diǎn)。
在研究Barrier的實(shí)現(xiàn)中,我查閱了蠻多的資料的。說(shuō)實(shí)話,其實(shí)現(xiàn)方式挺多的。在剔除了一些我能明確證明其有可能是錯(cuò)誤的,我選擇了我自己覺(jué)得最容易理解的一種。
第一節(jié)說(shuō)過(guò),barrier很像是以前的班級(jí)集合,站在一個(gè)老師的角度,你需要知道的東西至少有這兩個(gè):
- 班級(jí)有多少人。
- 目前已經(jīng)到了多少人。
只有當(dāng)目前已經(jīng)到了的人等于班級(jí)人數(shù)之后才能出發(fā)。
所以如果按照這個(gè)類比,實(shí)現(xiàn)一個(gè)barrier至少需要以下的幾個(gè)變量:
- 需要同時(shí)在barrier等待的線程的個(gè)數(shù)。
- 當(dāng)前到達(dá)barrier的線程的個(gè)數(shù)。
而按照barrier的邏輯,主要應(yīng)該有這些操作:
- 當(dāng)一個(gè)線程到達(dá)barrier的時(shí)候,增加計(jì)數(shù)。
- 如果個(gè)數(shù)不等于當(dāng)前需要等待的線程個(gè)數(shù),等待。
- 如果個(gè)數(shù)達(dá)到了需要等待的線程個(gè)數(shù),通知/喚醒所有等待的進(jìn)程,讓所有進(jìn)程通過(guò)barrier。
在不考慮加鎖的情況下,按照上面的邏輯,偽代碼大概應(yīng)該像這樣:
thread_count = n; <-- n是需要一起等待的線程的個(gè)數(shù) arrived_count = 0; <-- 到達(dá)線程的個(gè)數(shù) ------------------------------------------------------------- 以上是全局變量,只會(huì)初始化一次,以下是barrier開(kāi)始的代碼 ------------------------------------------------------------- arrived_count += 1; if(arrived_count == thread_count) notify_all_threads_and_unblok(); else block_and_wait();
而在多線程環(huán)境下,很明顯arrived_count這種全局變量更新需要加鎖。所以,對(duì)于這個(gè)代碼,綜合稍微再改動(dòng)一下,偽代碼可以更新下成為這樣:
thread_count = n; <-- n是需要一起等待的線程的個(gè)數(shù) arrived_count = 0; <-- 到達(dá)線程的個(gè)數(shù) ------------------------------------------------------------- 以上是全局變量,只會(huì)初始化一次,以下是barrier開(kāi)始的代碼 ------------------------------------------------------------- lock(); arrived_count += 1; unlock(); if(arrived_count == thread_count) notify_all_threads_and_unblok(); else block_and_wait();
這里,在有的語(yǔ)言中,鎖的粒度可能小了點(diǎn),取決于notify_all_threads和wait在這個(gè)語(yǔ)言中的定義,但是作為偽代碼,為了可能展示起來(lái)比較方便。
而如果你有并發(fā)編程的知識(shí),你應(yīng)該敏感的認(rèn)識(shí)到notify_all_threads_and_unblock,block_and_wait這種在這里雖然是簡(jiǎn)單的幾個(gè)單詞,但是其包含的操作步驟明顯不止一個(gè),更別說(shuō)背后的機(jī)器指令了。所以作為一個(gè)并發(fā)概念下運(yùn)行的程序,不可以簡(jiǎn)單的就放這樣一個(gè)操作在這里,如果都是任何函數(shù),指令,代碼都是自帶原子性的,那么寫多線程/并發(fā)程序也沒(méi)有啥好研究的了。所以對(duì)于這兩個(gè)操作,我們必須具體的擴(kuò)展下。
對(duì)于notify_all_threads_and_unblock和block_and_wait包含相當(dāng)多的操作,所以下面,得把這兩個(gè)操作具體的展開(kāi)。
thread_count = n; <-- n是需要一起等待的線程的個(gè)數(shù) arrived_count = 0; <-- 到達(dá)線程的個(gè)數(shù) could_release = false; ------------------------------------------------------------- 以上是全局變量,只會(huì)初始化一次,以下是barrier開(kāi)始的代碼 ------------------------------------------------------------- lock(); if(arrived_count == 0) could_release = false; arrived_count += 1; unlock(); if(arrived_count == thread_count) could_realse = true; arrived_count = 0; else while(could_release == false) spin()
這里多了一個(gè)變量could_release完成上面說(shuō)的兩個(gè)操作。原理也很簡(jiǎn)單,如果等待的個(gè)數(shù)沒(méi)有到達(dá)指定數(shù)目,這個(gè)值始終是false,在代碼中使用循環(huán)讓線程阻塞在spin處(當(dāng)然,假設(shè)spin是原子性的)。如果到達(dá)了thread_count,改變could_release的值,這樣循環(huán)條件不滿足,代碼可以繼續(xù)執(zhí)行。而在13行的if里面把a(bǔ)rrived_count重新設(shè)置為0是因?yàn)槿绻贿@樣做,那么這個(gè)barrier就只能用一次,因?yàn)闆](méi)有地方再把這個(gè)表示到達(dá)線程數(shù)目變量的初始值重新設(shè)置了。
我覺(jué)得這里需要停一下,來(lái)思一下上面的代碼,首先,這個(gè)代碼有很多看起來(lái)很像有問(wèn)題的地方。比如對(duì)于could_release和arrived_count的重置處,這都是賦值,而在并發(fā)程序中,任何寫操作都需要仔細(xì)思考是否需要加鎖,在這里,加鎖當(dāng)然沒(méi)問(wèn)題。但是盲目的加鎖會(huì)導(dǎo)致性能損失。
多線程程序最可怕的就是陷入細(xì)節(jié),所以,我一般都是整體的思考下是不是有問(wèn)題。對(duì)于一個(gè)barrier,錯(cuò)誤就是指沒(méi)有等所有的線程都到達(dá)了就停止了等待,人沒(méi)來(lái)齊就發(fā)車了。而怎么會(huì)導(dǎo)致這樣的情況呢?只有當(dāng)arrived_count值在兩個(gè)線程不同步才會(huì)導(dǎo)致錯(cuò)誤。秉承這個(gè)原則,看看上面的代碼,arrived_count的更新是加鎖的,所以在到達(dá)if之前其值是可以信賴的。而if這段判斷本身是讀操作,其判斷就是可以信賴的,因?yàn)閍rrived_count的值更新是可靠的,所以進(jìn)來(lái)的線程要么進(jìn)入if,要么進(jìn)入else。不存在線程1更新了arrived_count的值而線程2讀到了arrived_count的值而導(dǎo)致沒(méi)有到thread_count就更新了could_release的情況。
沒(méi)辦法,這類的程序就是很繞,所以我一般都不陷入細(xì)節(jié)。
現(xiàn)在看起來(lái),一切都很完美,但是多線程程序最惡心的地方就是可能的死鎖,饑餓等等。而這些又很難證明,而上面這段代碼,在某些情況下就是會(huì)導(dǎo)致死鎖??紤]thread_count等于2,也就是這個(gè)barrier需要等待兩個(gè)線程一起通過(guò)。
現(xiàn)在有兩個(gè)線程,t1和t2,t1先執(zhí)行直到17行,卡住,這時(shí)候t2獲得寶貴的cpu機(jī)會(huì)。很明顯,這時(shí)會(huì)進(jìn)入14行,更新could_release的值。如果這個(gè)時(shí)候t1獲得執(zhí)行機(jī)會(huì),萬(wàn)事大吉,t1會(huì)離開(kāi)while區(qū)域,繼續(xù)執(zhí)行。直到下次再次到達(dá)這個(gè)barrier。
但是如果這個(gè)時(shí)候t1并沒(méi)有獲得執(zhí)行機(jī)會(huì),t2一直執(zhí)行,雖然喚醒了could_relase,但是t1會(huì)一直停留在18行。要知道,這個(gè)含有barrier的代碼可能是在一個(gè)循環(huán)之中,如果t2再次到達(dá)barrier的區(qū)域,這時(shí)候arrived_count等于0(因?yàn)閍rrived_count在上一次t2進(jìn)入13行之后重置了),這個(gè)時(shí)候could_relase會(huì)變成false?,F(xiàn)在t1,t2都在18行了,沒(méi)有人有機(jī)會(huì)去更新could_relase的值,線程死鎖了。
怎么辦?仔細(xì)思考下,是喚醒機(jī)制有問(wèn)題,很明顯,如果能夠在喚醒的時(shí)候原子式的喚醒所有的線程,那么上面所說(shuō)的問(wèn)題就不存在了。在很多語(yǔ)言里都有這樣的方法可以完成上面說(shuō)的原子性的喚醒所有線程,比如c++里面的notify_all。但是,如果沒(méi)有這個(gè)函數(shù),該如何實(shí)現(xiàn)呢?
上面死鎖問(wèn)題的誕生在于一個(gè)線程不恰當(dāng)?shù)母铝巳值腸ould_relase,導(dǎo)致全部的判斷條件跟著錯(cuò)誤的改變。解決這樣的問(wèn)題,需要的是一個(gè)只有每個(gè)線程各自能看到,可以獨(dú)立更新,互相不干擾而又能被使用的變量。幸好,在設(shè)計(jì)多線程概念時(shí),有一個(gè)概念叫做thread local,剛好能夠滿足這個(gè)要求。而運(yùn)用這樣的變量,上述的概念可以表述成為:
thread_count = n; <-- n是需要一起等待的線程的個(gè)數(shù) arrived_count = 0; <-- 到達(dá)線程的個(gè)數(shù) could_release = false; thread_local_flag = could_release; <-- 線程局部變量,每個(gè)線程獨(dú)立更新 ------------------------------------------------------------- 以上是全局變量,只會(huì)初始化一次,以下是barrier開(kāi)始的代碼 ------------------------------------------------------------- thread_local_flag = !thread_local_flag lock(); arrived_count += 1; unlock(); if(arrived_count == thread_count) could_realse = thread_local_flag; arrived_count = 0; else while(could_release != thread_local_flag) spin()
這里要著重解釋下,為什么不會(huì)死鎖,由于thread_local_flag是每個(gè)線程獨(dú)立更新的,所以很明顯,其是不用加鎖的。其余代碼和上面的偽代碼類似,不同的是,如果發(fā)生上面一樣的情況,t2更新thread_local_flag的時(shí)候,只有其局部的變量會(huì)被置反而不會(huì)影響其余的線程的變量,而因?yàn)閏ould_realse是全局變量,在t2第一次執(zhí)行到13行的時(shí)候已經(jīng)設(shè)置成thread_local_flag一樣的值了。這個(gè)時(shí)候,哪怕t2再次執(zhí)行到16行也會(huì)因?yàn)槠鋬?nèi)部變量已經(jīng)被置反而阻塞在這個(gè)while循環(huán)之中。而t1只要獲得執(zhí)行機(jī)會(huì),就可以通過(guò)這個(gè)barrier。
有點(diǎn)繞,但是仔細(xì)想想還是蠻有意思的。
三、如何運(yùn)用c++實(shí)現(xiàn)Barrier?
雖然上面說(shuō)了那么多,但是c++中實(shí)現(xiàn)Barrier不需要這么復(fù)雜,這要感謝c++ 11中已經(jīng)自帶了很多原子性的操作,比如上面說(shuō)的notify_all。所以,代碼就沒(méi)有那么復(fù)雜了,當(dāng)然,c++也有thread_local,如果不畏勞苦,可以真的從最基礎(chǔ)的寫起。
#include <iostream> #include <condition_variable> #include <thread> #include <chrono> using namespace std; class TestBarrier{ public: TestBarrier(int nThreadCount): m_threadCount(nThreadCount), m_count(0), m_release(0) {} void wait1(){ unique_lock<mutex> lk(m_lock); if(m_count == 0){ m_release = 0; } m_count++; if(m_count == m_threadCount){ m_count = 0; m_release = 1; m_cv.notify_all(); } else{ m_cv.wait(lk, [&]{return m_release == 1;}); } } private: mutex m_lock; condition_variable m_cv; unsigned int m_threadCount; unsigned int m_count; unsigned int m_release; };
這里多虧了c++標(biāo)準(zhǔn)庫(kù)中引進(jìn)的condition_variable,使得上面的概念可以簡(jiǎn)單高效而又放心的實(shí)現(xiàn),你也不需要操心什么線程局部量。而關(guān)于c++并發(fā)相關(guān)的種種知識(shí)可能需要專門的若干篇幅才能說(shuō)清楚,如果你并不熟悉c++,可以跳過(guò)這些不知所云的部分。驗(yàn)證上述代碼可以使用如下代碼:
unsigned int threadWaiting = 5; TestBarrier barrier(5); void func1(){ this_thread::sleep_for(chrono::seconds(3)); cout<<"func1"<<endl; barrier.wait1(); cout<<"func1 has awakended!"<<endl; } void func2(){ cout<<"func2"<<endl; barrier.wait1(); cout<<"func2 has awakended!"<<endl; } void func3(){ this_thread::sleep_for(chrono::seconds(1)); cout<<"func3"<<endl; barrier.wait1(); cout<<"func3 has awakended!"<<endl; } int main(){ for(int i = 0; i < 5; i++){ thread t1(func1); thread t2(func3); thread t3(func2); thread t4(func3); thread t5(func2); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); } }
好了,在我機(jī)器上的運(yùn)行結(jié)果是這樣的,由于輸出沒(méi)有同步,所以輸出可能并沒(méi)有想象的那么整潔。但是不影響整體結(jié)果,可以看到,所有線程到齊之后才各自執(zhí)行各自后面的代碼:
到此這篇關(guān)于淺談c++如何實(shí)現(xiàn)并發(fā)中的Barrier的文章就介紹到這了,更多相關(guān)c++實(shí)現(xiàn)并發(fā)中的Barrier內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C語(yǔ)言超全面define預(yù)處理指令的使用說(shuō)明
C語(yǔ)言里可以用#define定義一個(gè)標(biāo)識(shí)符來(lái)表示一個(gè)常量。特點(diǎn)是:定義的標(biāo)識(shí)符不占內(nèi)存,只是一個(gè)臨時(shí)的符號(hào),預(yù)編譯后這個(gè)符號(hào)就不存在了,也不做類型定義。預(yù)編譯又叫預(yù)處理2022-04-04C語(yǔ)言內(nèi)存對(duì)齊實(shí)例詳解
這篇文章主要介紹了C語(yǔ)言內(nèi)存對(duì)齊,包括內(nèi)存對(duì)其的基本概念及用法,以及注意事項(xiàng),并以實(shí)例形式加以說(shuō)明,需要的朋友可以參考下2014-09-09數(shù)據(jù)結(jié)構(gòu) 棧的操作實(shí)例詳解
這篇文章主要介紹了數(shù)據(jù)結(jié)構(gòu) 順序棧的定義、初始化、空棧判斷、入棧、出棧操作的相關(guān)資料,需要的朋友可以參考下2017-06-06C語(yǔ)言之結(jié)構(gòu)體(struct)詳解
本文主要介紹C語(yǔ)言 結(jié)構(gòu)體的知識(shí),學(xué)習(xí)C語(yǔ)言肯定需要學(xué)習(xí)結(jié)構(gòu)體,這里詳細(xì)說(shuō)明了結(jié)構(gòu)體并附示例代碼,供大家參考學(xué)習(xí),有需要的小伙伴可以參考下2021-10-10C++和python實(shí)現(xiàn)單鏈表及其原理
這篇文章主要介紹了C++和python實(shí)現(xiàn)單鏈表及其原理,單鏈表是鏈表家族中的一員,每個(gè)節(jié)點(diǎn)依舊由數(shù)據(jù)域(data)和指針域(next)組成,鏈表的具體概念下面文章將詳細(xì)介紹,需要的小伙伴可以參考一下2022-03-03C++按照正態(tài)分布來(lái)排列整型數(shù)組元素
這篇文章主要介紹了C++按照正態(tài)分布來(lái)排列整型數(shù)組元素的相關(guān)資料,需要的朋友可以參考下2016-07-07Matlab實(shí)現(xiàn)三維投影繪制的示例代碼
這篇文章系小編為大家?guī)?lái)了一個(gè)三維投影繪制函數(shù)(三視圖繪制),函數(shù)支持三維曲線、曲面、三維多邊形、參數(shù)方程曲線、參數(shù)方程曲面的投影繪制,需要的可以參考一下2022-08-08