Linux基于阻塞隊(duì)列的生產(chǎn)消費(fèi)者模型詳解
一、什么是生產(chǎn)消費(fèi)者模型
生產(chǎn)消費(fèi)者模型就是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題,生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而是通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接交給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者索要數(shù)據(jù),而是直接從阻塞隊(duì)列中去取,這樣一來(lái),阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力
對(duì)于生產(chǎn)消費(fèi)者模型,我們有一個(gè)321規(guī)則,分別是3種關(guān)系,2種角色,1個(gè)交易場(chǎng)所
- 三種關(guān)系:生產(chǎn)者和生產(chǎn)者的互斥競(jìng)爭(zhēng)關(guān)系,消費(fèi)者和消費(fèi)者的互斥競(jìng)爭(zhēng)關(guān)系,生產(chǎn)者和消費(fèi)者的互斥、同步關(guān)系
- 兩種角色:生產(chǎn)者和消費(fèi)者
- 一個(gè)交易場(chǎng)所:特定結(jié)構(gòu)的內(nèi)存空間(如阻塞隊(duì)列)
二、基于阻塞隊(duì)列的生產(chǎn)消費(fèi)者模型
1、理論研究
在多線程編程中,阻塞隊(duì)列是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu),其與普通的隊(duì)列區(qū)別在于,當(dāng)隊(duì)列為空時(shí),從隊(duì)列獲取元素的操作將會(huì)被阻塞,直到隊(duì)列中再次被放入元素,當(dāng)隊(duì)列滿時(shí),往隊(duì)列中存放元素的操作也會(huì)被阻塞,直到有元素從隊(duì)列中被獲取
生產(chǎn)消費(fèi)者模型最大的好處是,也是生產(chǎn)消費(fèi)者模型效率高的原因是:在消費(fèi)者獲取數(shù)據(jù)(一般是網(wǎng)絡(luò)數(shù)據(jù))的時(shí)候,生產(chǎn)者可以生產(chǎn)數(shù)據(jù),生產(chǎn)者放入數(shù)據(jù)的時(shí)候,消費(fèi)者可以處理數(shù)據(jù),雖然特定內(nèi)存結(jié)構(gòu),也就是臨界資源區(qū)是有鎖的,只能由單線程通過(guò),只要將時(shí)間合理化,我們就可以實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的高效率工作,并且將發(fā)送數(shù)據(jù)的線程和處理數(shù)據(jù)的線程解耦合
2、多生產(chǎn)多消費(fèi)模型
(一)BlockQueue.hpp
#pragma once #include <iostream> #include <queue> #include <pthread.h> //定義一個(gè)模版類,方便我們使用任何類型進(jìn)行生產(chǎn)消費(fèi) template <class T> //定義一個(gè)阻塞隊(duì)列 class BlockQueue { //隊(duì)列默認(rèn)最大容量 static const int defalutnum = 20; public: BlockQueue(int maxcap = defalutnum) : maxcap_(maxcap) { //初始化互斥鎖和生產(chǎn)者和消費(fèi)者的條件變量 pthread_mutex_init(&mutex_, nullptr); pthread_cond_init(&c_cond_, nullptr); pthread_cond_init(&p_cond_, nullptr); //下面注釋掉的是設(shè)置水位線,設(shè)置最低最高水位線 //在阻塞隊(duì)列中的數(shù)據(jù),在低于最低水位線時(shí)是不可被獲取的,只能寫入 //在高于最高水位線時(shí)是不可被寫入的,只能獲取 // low_water_ = maxcap_/3; // high_water_ = (maxcap_*2)/3; } //從隊(duì)列頭取出元素返回 T pop() { pthread_mutex_lock(&mutex_);//加鎖 //只能用while不能用if,原因是會(huì)出現(xiàn)誤喚醒問(wèn)題,下面說(shuō) while (q_.size() == 0) { pthread_cond_wait(&c_cond_, &mutex_); } T out = q_.front(); q_.pop(); //這里是加了水位線的版本,在低于水位線的時(shí)候要喚醒生產(chǎn)者 // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_); pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_);//解鎖 return out; } void push(const T &in) { pthread_mutex_lock(&mutex_);//加鎖 //同pop函數(shù) while (q_.size() == maxcap_) { pthread_cond_wait(&p_cond_, &mutex_); } q_.push(in); //這里是加了水位線的版本,在高于水位線的時(shí)候要喚醒消費(fèi)者 // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_); pthread_cond_signal(&c_cond_); pthread_mutex_unlock(&mutex_);//解鎖 } //析構(gòu)函數(shù) ~BlockQueue() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&c_cond_); pthread_cond_destroy(&p_cond_); } private: std::queue<T> q_; int maxcap_; // 極大值 pthread_mutex_t mutex_; pthread_cond_t c_cond_; pthread_cond_t p_cond_; //最低最高水位線 // int low_water_; // int high_water_; };
(二)Task.hpp
#pragma once #include <iostream> #include <string> //定義運(yùn)算方法 std::string opers = "+-*/%"; //枚舉錯(cuò)誤 enum { DivZero = 1, ModZero, Unknown }; class Task { public: Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0) {} void run() { switch (oper_) { case '+': result_ = data1_ + data2_; break; case '-': result_ = data1_ - data2_; break; case '*': result_ = data1_ * data2_; break; case '/': { if (data2_ == 0) exitcode_ = DivZero; else result_ = data1_ / data2_; } break; case '%': { if (data2_ == 0) exitcode_ = ModZero; else result_ = data1_ % data2_; } break; default: exitcode_ = Unknown; break; } } //偽函數(shù),通過(guò)重載()使run可以像函數(shù)一樣調(diào)用 void operator()() { run(); } //返回的運(yùn)算結(jié)果以及錯(cuò)誤代碼 std::string GetResult() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "="; r += std::to_string(result_); r += "[code: "; r += std::to_string(exitcode_); r += "]"; return r; } //返回運(yùn)算表達(dá)式 std::string GetTask() { std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r += "=?"; return r; } ~Task() {} private: int data1_; int data2_; char oper_; int result_; int exitcode_; };
(三)main.cpp
#include "BlockQueue.hpp" #include "Task.hpp" #include <unistd.h> #include <ctime> void *Consumer(void *args) { BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); while (true) { // 消費(fèi) Task t = bq->pop(); // 計(jì)算 t(); //模擬消費(fèi)者處理任務(wù) std::cout << "處理任務(wù): " << t.GetTask() << " 運(yùn)算結(jié)果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl; } } void *Productor(void *args) { int len = opers.size(); BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); int x = 10; int y = 20; while (true) { // 用隨機(jī)數(shù)運(yùn)算模擬生產(chǎn)者生產(chǎn)數(shù)據(jù) int data1 = rand() % 10 + 1; // [1,10] usleep(10); int data2 = rand() % 10; char op = opers[rand() % len]; Task t(data1, data2, op); // 生產(chǎn) bq->push(t); std::cout << "生產(chǎn)了一個(gè)任務(wù): " << t.GetTask() << " thread id: " << pthread_self() << std::endl; sleep(1); } } int main() { //隨機(jī)數(shù)種子 srand(time(nullptr)); //給阻塞隊(duì)列傳一個(gè)任務(wù) BlockQueue<Task> *bq = new BlockQueue<Task>(); //多生產(chǎn)者多消費(fèi)者 pthread_t c[3], p[5]; for (int i = 0; i < 3; i++) { pthread_create(c + i, nullptr, Consumer, bq); } for (int i = 0; i < 5; i++) { pthread_create(p + i, nullptr, Productor, bq); } for (int i = 0; i < 3; i++) { pthread_join(c[i], nullptr); } for (int i = 0; i < 5; i++) { pthread_join(p[i], nullptr); } delete bq; return 0; }
3、誤喚醒問(wèn)題
誤喚醒問(wèn)題就是在調(diào)用pop
函數(shù)或者push
函數(shù)的時(shí)候可能會(huì)引起的,下面我們?cè)侔汛a貼出來(lái),然后把上面有過(guò)的注釋去掉
//... T pop() { pthread_mutex_lock(&mutex_); while (q_.size() == 0) //不能調(diào)用if而要用while { pthread_cond_wait(&c_cond_, &mutex_); } T out = q_.front(); q_.pop(); pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_); return out; } void push(const T &in) { pthread_mutex_lock(&mutex_); while (q_.size() == maxcap_) { pthread_cond_wait(&p_cond_, &mutex_); } q_.push(in); pthread_cond_signal(&c_cond_); pthread_mutex_unlock(&mutex_); //...
在多生產(chǎn)者 - 多消費(fèi)者并發(fā)編程場(chǎng)景中,誤喚醒現(xiàn)象較為常見(jiàn),假定隊(duì)列當(dāng)前處于滿狀態(tài),當(dāng)一個(gè)消費(fèi)者線程成功消費(fèi)一個(gè)數(shù)據(jù)后,隊(duì)列中會(huì)空出一個(gè)位置,隨后,線程可能多次調(diào)用pthread_cond_signal(&p_cond_)
函數(shù),喚醒了一批正在 p_cond_
條件變量下等待的生產(chǎn)者線程,由于被喚醒的生產(chǎn)者線程需要重新競(jìng)爭(zhēng)互斥鎖,這些線程之間呈現(xiàn)出互斥關(guān)系,在先前執(zhí)行消費(fèi)操作的線程釋放鎖之后,僅有一個(gè)生產(chǎn)者線程能夠成功獲取鎖,其余雖被喚醒但未能搶到鎖的生產(chǎn)者線程只能在鎖處等待
當(dāng)成功獲取鎖的生產(chǎn)者線程完成數(shù)據(jù)生產(chǎn)操作后,隊(duì)列可能再次達(dá)到滿狀態(tài),此時(shí),該線程會(huì)調(diào)用 pthread_cond_signal(&c_cond_)
函數(shù)喚醒一個(gè)消費(fèi)者線程,隨后釋放鎖,在此情形下,被喚醒的線程不僅包括剛剛被喚醒的消費(fèi)者線程,還涵蓋之前被喚醒卻未搶到鎖的生產(chǎn)者線程,它們會(huì)同時(shí)參與鎖的競(jìng)爭(zhēng),若使用 if
語(yǔ)句來(lái)判斷隊(duì)列是否已滿,當(dāng)某個(gè)生產(chǎn)者線程搶到鎖后,可能不會(huì)再次對(duì)隊(duì)列狀態(tài)進(jìn)行檢查,直接嘗試向已滿的隊(duì)列中添加數(shù)據(jù),從而引發(fā)錯(cuò)誤
因此,為確保線程安全,應(yīng)使用 while
循環(huán)來(lái)包裹 pthread_cond_wait
函數(shù),當(dāng)一個(gè)線程被喚醒并成功獲取鎖后,不應(yīng)直接執(zhí)行隊(duì)列操作(無(wú)論是生產(chǎn)數(shù)據(jù)還是消費(fèi)數(shù)據(jù)),而應(yīng)再次檢查資源是否滿足操作條件,若資源就緒,則可繼續(xù)執(zhí)行隊(duì)列操作;若資源未就緒,則應(yīng)再次調(diào)用 pthread_cond_wait
函數(shù),使線程進(jìn)入休眠狀態(tài),等待后續(xù)喚醒
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
systemd添加自定義系統(tǒng)服務(wù)設(shè)置自定義開(kāi)機(jī)啟動(dòng)的方法
下面小編就為大家?guī)?lái)一篇systemd添加自定義系統(tǒng)服務(wù)設(shè)置自定義開(kāi)機(jī)啟動(dòng)的方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12Apache No space left on device: mod_rewrite: could not creat
這篇文章主要介紹了Apache No space left on device: mod_rewrite: could not create rewrite_log_lock Configuration Failed問(wèn)題的解決方法,需要的朋友可以參考下2014-09-09Centos7 安裝達(dá)夢(mèng)數(shù)據(jù)庫(kù)的教程
這篇文章主要介紹了Centos7 安裝達(dá)夢(mèng)數(shù)據(jù)庫(kù)的教程,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-12-12淺談Linux系統(tǒng)中的異常堆棧跟蹤的簡(jiǎn)單實(shí)現(xiàn)
下面小編就為大家?guī)?lái)一篇淺談Linux系統(tǒng)中的異常堆棧跟蹤的簡(jiǎn)單實(shí)現(xiàn)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12使用 chkconfig 和 systemctl 命令啟用或禁用 Linux 服務(wù)的方法
在 Linux 中,無(wú)論何時(shí)當(dāng)你安裝任何帶有服務(wù)和守護(hù)進(jìn)程的包,系統(tǒng)默認(rèn)會(huì)把這些服務(wù)的初始化及 systemd 腳本添加進(jìn)去,不過(guò)此時(shí)它們并沒(méi)有被啟用。下面小編給大家?guī)?lái)了使用 chkconfig 和 systemctl 命令啟用或禁用 Linux 服務(wù)的方法,一起看看吧2018-11-11Linux用戶建立腳本/猜字游戲/網(wǎng)卡流量監(jiān)控介紹
大家好,本篇文章主要講的是Linux用戶建立腳本/猜字游戲/網(wǎng)卡流量監(jiān)控介紹,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下2021-12-12