亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Linux基于阻塞隊(duì)列的生產(chǎn)消費(fèi)者模型詳解

 更新時(shí)間:2025年04月27日 09:22:40   作者:s_little_monster_  
這篇文章主要介紹了Linux基于阻塞隊(duì)列的生產(chǎn)消費(fèi)者模型,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

一、什么是生產(chǎn)消費(fèi)者模型

生產(chǎn)消費(fèi)者模型就是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題,生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而是通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接交給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者索要數(shù)據(jù),而是直接從阻塞隊(duì)列中去取,這樣一來(lái),阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力

對(duì)于生產(chǎn)消費(fèi)者模型,我們有一個(gè)321規(guī)則,分別是3種關(guān)系,2種角色,1個(gè)交易場(chǎng)所

  • 三種關(guān)系:生產(chǎn)者和生產(chǎn)者的互斥競(jìng)爭(zhēng)關(guān)系,消費(fèi)者和消費(fèi)者的互斥競(jìng)爭(zhēng)關(guān)系,生產(chǎn)者和消費(fèi)者的互斥、同步關(guān)系
  • 兩種角色:生產(chǎn)者和消費(fèi)者
  • 一個(gè)交易場(chǎng)所:特定結(jié)構(gòu)的內(nèi)存空間(如阻塞隊(duì)列)

二、基于阻塞隊(duì)列的生產(chǎn)消費(fèi)者模型

1、理論研究

在多線程編程中,阻塞隊(duì)列是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu),其與普通的隊(duì)列區(qū)別在于,當(dāng)隊(duì)列為空時(shí),從隊(duì)列獲取元素的操作將會(huì)被阻塞,直到隊(duì)列中再次被放入元素,當(dāng)隊(duì)列滿時(shí),往隊(duì)列中存放元素的操作也會(huì)被阻塞,直到有元素從隊(duì)列中被獲取

生產(chǎn)消費(fèi)者模型最大的好處是,也是生產(chǎn)消費(fèi)者模型效率高的原因是:在消費(fèi)者獲取數(shù)據(jù)(一般是網(wǎng)絡(luò)數(shù)據(jù))的時(shí)候,生產(chǎn)者可以生產(chǎn)數(shù)據(jù),生產(chǎn)者放入數(shù)據(jù)的時(shí)候,消費(fèi)者可以處理數(shù)據(jù),雖然特定內(nèi)存結(jié)構(gòu),也就是臨界資源區(qū)是有鎖的,只能由單線程通過(guò),只要將時(shí)間合理化,我們就可以實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的高效率工作,并且將發(fā)送數(shù)據(jù)的線程和處理數(shù)據(jù)的線程解耦合

2、多生產(chǎn)多消費(fèi)模型

(一)BlockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
//定義一個(gè)模版類,方便我們使用任何類型進(jìn)行生產(chǎn)消費(fèi)
template <class T>
//定義一個(gè)阻塞隊(duì)列
class BlockQueue
{
	//隊(duì)列默認(rèn)最大容量
    static const int defalutnum = 20;

public:
    BlockQueue(int maxcap = defalutnum) : maxcap_(maxcap)
    {
    	//初始化互斥鎖和生產(chǎn)者和消費(fèi)者的條件變量
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
        //下面注釋掉的是設(shè)置水位線,設(shè)置最低最高水位線
        //在阻塞隊(duì)列中的數(shù)據(jù),在低于最低水位線時(shí)是不可被獲取的,只能寫入
        //在高于最高水位線時(shí)是不可被寫入的,只能獲取
        // low_water_ = maxcap_/3;
        // high_water_ = (maxcap_*2)/3;
    }

	//從隊(duì)列頭取出元素返回
    T pop()
    {
        pthread_mutex_lock(&mutex_);//加鎖
        //只能用while不能用if,原因是會(huì)出現(xiàn)誤喚醒問(wèn)題,下面說(shuō)
        while (q_.size() == 0)
        {
            pthread_cond_wait(&c_cond_, &mutex_); 
        }

        T out = q_.front();
        q_.pop();
		//這里是加了水位線的版本,在低于水位線的時(shí)候要喚醒生產(chǎn)者
        // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
        pthread_cond_signal(&p_cond_); 
        pthread_mutex_unlock(&mutex_);//解鎖

        return out;
    }

    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);//加鎖
        //同pop函數(shù)
        while (q_.size() == maxcap_)
        {
            pthread_cond_wait(&p_cond_, &mutex_); 
        }
        q_.push(in); 
        
        //這里是加了水位線的版本,在高于水位線的時(shí)候要喚醒消費(fèi)者
        // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);//解鎖
    }
	//析構(gòu)函數(shù)
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }

private:
    std::queue<T> q_; 
    
    int maxcap_; // 極大值
    
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;
    
	//最低最高水位線
    // int low_water_;
    // int high_water_;
};

(二)Task.hpp

#pragma once
#include <iostream>
#include <string>

//定義運(yùn)算方法
std::string opers = "+-*/%";

//枚舉錯(cuò)誤
enum
{
    DivZero = 1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {}
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
        {
            if (data2_ == 0)
                exitcode_ = DivZero;
            else
                result_ = data1_ / data2_;
        }
        break;
        case '%':
        {
            if (data2_ == 0)
                exitcode_ = ModZero;
            else
                result_ = data1_ % data2_;
        }
        break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    //偽函數(shù),通過(guò)重載()使run可以像函數(shù)一樣調(diào)用
    void operator()()
    {
        run();
    }
    //返回的運(yùn)算結(jié)果以及錯(cuò)誤代碼
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    //返回運(yùn)算表達(dá)式
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {}

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

(三)main.cpp

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>

void *Consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

    while (true)
    {
        // 消費(fèi)
        Task t = bq->pop();
        // 計(jì)算
        t();
        
		//模擬消費(fèi)者處理任務(wù)
        std::cout << "處理任務(wù): " << t.GetTask() << " 運(yùn)算結(jié)果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;
    }
}

void *Productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    int x = 10;
    int y = 20;
    while (true)
    {
        // 用隨機(jī)數(shù)運(yùn)算模擬生產(chǎn)者生產(chǎn)數(shù)據(jù)
        int data1 = rand() % 10 + 1; // [1,10]
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 生產(chǎn)
        bq->push(t);
        std::cout << "生產(chǎn)了一個(gè)任務(wù): " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
        sleep(1);
    }
}

int main()
{
	//隨機(jī)數(shù)種子
    srand(time(nullptr));

    //給阻塞隊(duì)列傳一個(gè)任務(wù)
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    //多生產(chǎn)者多消費(fèi)者
    pthread_t c[3], p[5];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(p + i, nullptr, Productor, bq);
    }

    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }
    delete bq;
    return 0;
}

3、誤喚醒問(wèn)題

誤喚醒問(wèn)題就是在調(diào)用pop函數(shù)或者push函數(shù)的時(shí)候可能會(huì)引起的,下面我們?cè)侔汛a貼出來(lái),然后把上面有過(guò)的注釋去掉

//...

	T pop()
    {
        pthread_mutex_lock(&mutex_);
        while (q_.size() == 0) //不能調(diào)用if而要用while
        {
            pthread_cond_wait(&c_cond_, &mutex_); 
        }

        T out = q_.front();
        q_.pop();

        pthread_cond_signal(&p_cond_); 
        pthread_mutex_unlock(&mutex_);

        return out;
    }

    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        while (q_.size() == maxcap_)
        {
            pthread_cond_wait(&p_cond_, &mutex_); 
        }
        q_.push(in); 
        
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);
        
//...

在多生產(chǎn)者 - 多消費(fèi)者并發(fā)編程場(chǎng)景中,誤喚醒現(xiàn)象較為常見(jiàn),假定隊(duì)列當(dāng)前處于滿狀態(tài),當(dāng)一個(gè)消費(fèi)者線程成功消費(fèi)一個(gè)數(shù)據(jù)后,隊(duì)列中會(huì)空出一個(gè)位置,隨后,線程可能多次調(diào)用pthread_cond_signal(&p_cond_) 函數(shù),喚醒了一批正在 p_cond_ 條件變量下等待的生產(chǎn)者線程,由于被喚醒的生產(chǎn)者線程需要重新競(jìng)爭(zhēng)互斥鎖,這些線程之間呈現(xiàn)出互斥關(guān)系,在先前執(zhí)行消費(fèi)操作的線程釋放鎖之后,僅有一個(gè)生產(chǎn)者線程能夠成功獲取鎖,其余雖被喚醒但未能搶到鎖的生產(chǎn)者線程只能在鎖處等待

當(dāng)成功獲取鎖的生產(chǎn)者線程完成數(shù)據(jù)生產(chǎn)操作后,隊(duì)列可能再次達(dá)到滿狀態(tài),此時(shí),該線程會(huì)調(diào)用 pthread_cond_signal(&c_cond_) 函數(shù)喚醒一個(gè)消費(fèi)者線程,隨后釋放鎖,在此情形下,被喚醒的線程不僅包括剛剛被喚醒的消費(fèi)者線程,還涵蓋之前被喚醒卻未搶到鎖的生產(chǎn)者線程,它們會(huì)同時(shí)參與鎖的競(jìng)爭(zhēng),若使用 if 語(yǔ)句來(lái)判斷隊(duì)列是否已滿,當(dāng)某個(gè)生產(chǎn)者線程搶到鎖后,可能不會(huì)再次對(duì)隊(duì)列狀態(tài)進(jìn)行檢查,直接嘗試向已滿的隊(duì)列中添加數(shù)據(jù),從而引發(fā)錯(cuò)誤

因此,為確保線程安全,應(yīng)使用 while 循環(huán)來(lái)包裹 pthread_cond_wait 函數(shù),當(dāng)一個(gè)線程被喚醒并成功獲取鎖后,不應(yīng)直接執(zhí)行隊(duì)列操作(無(wú)論是生產(chǎn)數(shù)據(jù)還是消費(fèi)數(shù)據(jù)),而應(yīng)再次檢查資源是否滿足操作條件,若資源就緒,則可繼續(xù)執(zhí)行隊(duì)列操作;若資源未就緒,則應(yīng)再次調(diào)用 pthread_cond_wait 函數(shù),使線程進(jìn)入休眠狀態(tài),等待后續(xù)喚醒

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • systemd添加自定義系統(tǒng)服務(wù)設(shè)置自定義開(kāi)機(jī)啟動(dòng)的方法

    systemd添加自定義系統(tǒng)服務(wù)設(shè)置自定義開(kāi)機(jī)啟動(dòng)的方法

    下面小編就為大家?guī)?lái)一篇systemd添加自定義系統(tǒng)服務(wù)設(shè)置自定義開(kāi)機(jī)啟動(dòng)的方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-12-12
  • Apache No space left on device: mod_rewrite: could not create rewrite_log_lock Configuration Failed

    Apache No space left on device: mod_rewrite: could not creat

    這篇文章主要介紹了Apache No space left on device: mod_rewrite: could not create rewrite_log_lock Configuration Failed問(wèn)題的解決方法,需要的朋友可以參考下
    2014-09-09
  • Centos7 安裝達(dá)夢(mèng)數(shù)據(jù)庫(kù)的教程

    Centos7 安裝達(dá)夢(mèng)數(shù)據(jù)庫(kù)的教程

    這篇文章主要介紹了Centos7 安裝達(dá)夢(mèng)數(shù)據(jù)庫(kù)的教程,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-12-12
  • Ubuntu20.04安裝配置GitLab的方法步驟

    Ubuntu20.04安裝配置GitLab的方法步驟

    這篇文章主要介紹了Ubuntu20.04安裝配置GitLab的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • 基于?Apache?的?httpd?文件服務(wù)器詳解

    基于?Apache?的?httpd?文件服務(wù)器詳解

    httpd?HTTP?Daemon,超文本傳輸協(xié)議守護(hù)進(jìn)程的簡(jiǎn)稱,運(yùn)行于網(wǎng)頁(yè)服務(wù)器后臺(tái),等待傳入服務(wù)器請(qǐng)求的軟件,這篇文章主要介紹了基于?Apache?的?httpd?文件服務(wù)器,需要的朋友可以參考下
    2024-07-07
  • 淺談Linux系統(tǒng)中的異常堆棧跟蹤的簡(jiǎn)單實(shí)現(xiàn)

    淺談Linux系統(tǒng)中的異常堆棧跟蹤的簡(jiǎn)單實(shí)現(xiàn)

    下面小編就為大家?guī)?lái)一篇淺談Linux系統(tǒng)中的異常堆棧跟蹤的簡(jiǎn)單實(shí)現(xiàn)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-12-12
  • 在Linux里安裝和啟動(dòng)nginx的方法

    在Linux里安裝和啟動(dòng)nginx的方法

    本篇文章主要介紹了在Linux里安裝和啟動(dòng)nginx的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-05-05
  • 使用 chkconfig 和 systemctl 命令啟用或禁用 Linux 服務(wù)的方法

    使用 chkconfig 和 systemctl 命令啟用或禁用 Linux 服務(wù)的方法

    在 Linux 中,無(wú)論何時(shí)當(dāng)你安裝任何帶有服務(wù)和守護(hù)進(jìn)程的包,系統(tǒng)默認(rèn)會(huì)把這些服務(wù)的初始化及 systemd 腳本添加進(jìn)去,不過(guò)此時(shí)它們并沒(méi)有被啟用。下面小編給大家?guī)?lái)了使用 chkconfig 和 systemctl 命令啟用或禁用 Linux 服務(wù)的方法,一起看看吧
    2018-11-11
  • Linux中g(shù)pio接口的使用方法示例

    Linux中g(shù)pio接口的使用方法示例

    這篇文章主要給大家介紹了關(guān)于Linux中g(shù)pio接口的使用方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-11-11
  • Linux用戶建立腳本/猜字游戲/網(wǎng)卡流量監(jiān)控介紹

    Linux用戶建立腳本/猜字游戲/網(wǎng)卡流量監(jiān)控介紹

    大家好,本篇文章主要講的是Linux用戶建立腳本/猜字游戲/網(wǎng)卡流量監(jiān)控介紹,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下
    2021-12-12

最新評(píng)論