Linux進(jìn)程間通信之管道如何實(shí)現(xiàn)進(jìn)程池
一、管道的特點(diǎn)
只能用于具有共同祖先的進(jìn)程之間進(jìn)行通信,通常,一個(gè)管道由一個(gè)進(jìn)程創(chuàng)建,然后該進(jìn)程調(diào)用fork創(chuàng)建子進(jìn)程,此后父子進(jìn)程就可以使用該管道進(jìn)行通信
管道面向字節(jié)流,即管道不曉得自己里面的內(nèi)容,只是一味按照父子進(jìn)程之間的協(xié)調(diào)進(jìn)行傳輸信息,父子進(jìn)程在讀取其中的內(nèi)容時(shí)是不看內(nèi)容是否有\n
和\0
等含有特殊意義的內(nèi)容
因?yàn)楣艿赖谋举|(zhì)是一種內(nèi)存級(jí)文件,所以管道的生命周期伴隨著進(jìn)程的退出而結(jié)束
一般而言,,內(nèi)核會(huì)對(duì)管道操作進(jìn)行同步與互斥,同步是指多個(gè)進(jìn)程或線程在訪問(wèn)共享資源或進(jìn)行特定操作時(shí),按照一定的順序或規(guī)則進(jìn)行協(xié)調(diào),以確保它們之間的操作能夠正確、有序地執(zhí)行,互斥是指在同一時(shí)刻,只允許一個(gè)進(jìn)程或線程訪問(wèn)共享資源,以避免多個(gè)進(jìn)程或線程同時(shí)訪問(wèn)導(dǎo)致的數(shù)據(jù)不一致或沖突問(wèn)題
管道為半雙工通道,只能單向傳遞信息,需要雙向通信就要建立兩個(gè)管道
我們?cè)诿钚兄惺褂玫?code>|就是匿名通道
二、進(jìn)程池
1、概念
我們知道在我們創(chuàng)建子進(jìn)程的時(shí)候要調(diào)用fork
函數(shù),這是一個(gè)系統(tǒng)調(diào)用接口,所以會(huì)對(duì)系統(tǒng)產(chǎn)生成本,如果我們一次創(chuàng)建很多個(gè)進(jìn)程,那么系統(tǒng)會(huì)變得很累,所以我們引入池的概念,進(jìn)程池可以保證在我們需要使用進(jìn)程的情況下,由于提前創(chuàng)建了子進(jìn)程,我們直接分配就行了,避免了我們需要大量進(jìn)程的情況下操作系統(tǒng)很吃力的情況,對(duì)提前創(chuàng)建好的這些子進(jìn)程進(jìn)行先描述后組織的
2、用管道實(shí)現(xiàn)一個(gè)簡(jiǎn)易進(jìn)程池
(一)頭文件、宏、全局變量和main函數(shù)
#include <iostream> #include <vector> #include <string> #include <unistd.h> #include "task.hpp" #include <sys/stat.h> #include <sys/wait.h> #include <cstdio> #define PROCESSNUM 10 std::vector<task_t> tasks; int main() { //加載任務(wù) LoadTask(&tasks); //定義一個(gè)vector管理所有的管道,channel是描述,channels是組織 std::vector<channel> channels; //初始化 InitProcessPool(&channels); //開始進(jìn)行 StartProcessPool(channels); //清理 CleanProcessPool(channels); return 0; }
(二)初始化函數(shù)InitProcessPool
初始化函數(shù)里有一個(gè)重要的點(diǎn)就是,我們的子進(jìn)程是循環(huán)創(chuàng)建的,所以在創(chuàng)建第一個(gè)子進(jìn)程時(shí)沒(méi)有問(wèn)題,但是創(chuàng)建第二個(gè)子進(jìn)程開始,因?yàn)閯倓?chuàng)建出的第二個(gè)子進(jìn)程與父進(jìn)程是一樣的,此時(shí)都作為寫端連接著一個(gè)管道,我們?cè)趫D中用綠色的線標(biāo)注出來(lái)了,第三個(gè)子進(jìn)程又可以成為第一二個(gè)管道的寫端,以此類推,每個(gè)子進(jìn)程后創(chuàng)建的子進(jìn)程都會(huì)是上個(gè)信道的寫端,這與我們想要父進(jìn)程寫,子進(jìn)程讀的要求相悖,所以我們初始化的另一個(gè)目的就是將這些多余的連接全部斷開,也就是圖中彩色的線全部斷開,進(jìn)而保證只有父進(jìn)程在寫端
- task.hpp
#pragma once #include <iostream> #include <vector> //定義一個(gè)函數(shù)指針task_t指向返回值為void,沒(méi)有參數(shù)的函數(shù) typedef void (*task_t)(); void task1() { std::cout << "this is task1 running" << std::endl; } void task2() { std::cout << "this is task2 running" << std::endl; } void task3() { std::cout << "this is task3 running" << std::endl; } void task4() { std::cout << "this is task4 running" << std::endl; } //加載任務(wù)函數(shù),將任務(wù)pushback到vector中 void LoadTask(std::vector<task_t> *tasks) { tasks->push_back(task1); tasks->push_back(task2); tasks->push_back(task3); tasks->push_back(task4); }
- test.cpp
class channel { public: // 描述父進(jìn)程的fd,對(duì)應(yīng)子進(jìn)程的pid,子進(jìn)程的名字 channel(int cmdfd, int slaverid, const std::string &processname) :_cmdfd(cmdfd),_slaverid(slaverid),_processname(processname) {} int _cmdfd; pid_t _slaverid; std::string _processname; }; void slaver() { while(true) { // 用于存儲(chǔ)從標(biāo)準(zhǔn)輸入讀取的命令碼 int cmdcode = 0; // 從標(biāo)準(zhǔn)輸入(管道)讀取數(shù)據(jù),嘗試讀取sizeof(int)字節(jié)的數(shù)據(jù)到cmdcode中 // 如果父進(jìn)程不給子進(jìn)程發(fā)送數(shù)據(jù)子進(jìn)程就會(huì)進(jìn)入阻塞等待 int n = read(0, &cmdcode, sizeof(int)); if(n == sizeof(int)) { // read的返回值與sizeof(int)相等,就輸出子進(jìn)程pid和獲得命令碼 // 如果命令碼有效就調(diào)用task任務(wù),無(wú)效就退出 std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl; if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode](); } else break; } } void InitProcessPool(std::vector<channel>* channels) { //用于存儲(chǔ)之前創(chuàng)建的管道的寫端文件描述符 //目的是讓后續(xù)創(chuàng)建的子進(jìn)程可以關(guān)閉這些舊的寫端文件描述符,避免資源泄漏 std::vector<int> oldfds; //循環(huán)創(chuàng)建子進(jìn)程 for(int i = 0; i < PROCESSNUM; i++) { int pipefd[2] = {0}; int n = pipe(pipefd); if(n < 0) { return; } pid_t id = fork(); if(id < 0) { return; } if(id == 0) { //打印子進(jìn)程pid,打印并關(guān)閉上一個(gè)管道寫端文件描述符 std::cout << "child : " << getpid() << " close history fd: "; for(auto fd : oldfds) { std::cout << fd << " "; close(fd); } std::cout << std::endl; //關(guān)閉寫端通道 close(pipefd[1]); //將當(dāng)前管道的讀端文件描述符復(fù)制到標(biāo)準(zhǔn)輸入 //這樣子進(jìn)程就可以通過(guò)標(biāo)準(zhǔn)輸入從管道讀取數(shù)據(jù) dup2(pipefd[0],0); // 讀取完關(guān)閉管道讀端 close(pipefd[0]); // 子進(jìn)程主要業(yè)務(wù) slaver(); //打印子進(jìn)程要退出了 std::cout << "process : " << getpid() << " quit" << std::endl; exit(0); } //父進(jìn)程開始 //關(guān)閉讀端 close(pipefd[0]); //將當(dāng)前channel信息添加到channels進(jìn)行組織 std::string name = "process-" + std::to_string(i); channels->push_back(channel(pipefd[1],id,name)); //添加這個(gè)寫端的文件描述符,方便后面的進(jìn)程關(guān)閉它 oldfds.push_back(pipefd[1]); sleep(1); } }
(三)執(zhí)行函數(shù)StartProcessPool
//打印一個(gè)選擇任務(wù)的菜單 void Menu() { std::cout << "################################################" << std::endl; std::cout << "# 1. 任務(wù)一 2. 任務(wù)二 #" << std::endl; std::cout << "# 3. 任務(wù)三 4. 任務(wù)四 #" << std::endl; std::cout << "# 0. 退出 #" << std::endl; std::cout << "#################################################" << std::endl; } void StartProcessPool(std::vector<channel>* channels) { while(true) { int select = 0; Menu(); sleep(1); //輸入選項(xiàng) std::cout << "Please Enter>> "; std::cin >> select; if(select <= 0 || select >= 5) break; //將控制碼也就是選擇的數(shù)字select1234轉(zhuǎn)化為0123,因?yàn)関ector下標(biāo)從0開始,所以要-1 int cmdcode = select - 1; //通過(guò)管道寫入信息,等待slaver()讀取 write(channels[select]._cmdfd, &cmdcode, sizeof(cmdcode)); sleep(1); } }
(四)清理函數(shù)CleanProcessPool
void CleanProcessPool(std::vector<channel> &channels) { //每個(gè)channel對(duì)象的左邊為父進(jìn)程的fd,右邊為子進(jìn)程fd,斷開父進(jìn)程fd,然后進(jìn)程等待 //父進(jìn)程斷開后子進(jìn)程會(huì)在管道中讀到0,即文件結(jié)束,然后子進(jìn)程就會(huì)終止 //然后被父進(jìn)程回收 for(const auto &c : channels){ close(c._cmdfd); waitpid(c._slaverid, nullptr, 0); } }
三、進(jìn)程池其他問(wèn)題
1、描述整個(gè)過(guò)程
首先啟動(dòng)進(jìn)程,將任務(wù)函數(shù)“上膛”到vector中,然后進(jìn)行初始化,創(chuàng)建出第一個(gè)子進(jìn)程,第一個(gè)子進(jìn)程執(zhí)行常規(guī)操作,比如將寫端關(guān)閉,將當(dāng)前管道讀端文件描述符復(fù)制到標(biāo)準(zhǔn)輸入以來(lái)獲取標(biāo)準(zhǔn)輸入的數(shù)據(jù),然后就是等待父進(jìn)程發(fā)送信息,在此同時(shí),父進(jìn)程也不閑著,將當(dāng)前讀端關(guān)閉,然后描述channel進(jìn)而pushback到channels中進(jìn)行組織,然后在oldfds中存下管道寫端對(duì)應(yīng)的fd,方便后面子進(jìn)程的斷開,然后創(chuàng)建第二個(gè)子進(jìn)程,第二個(gè)子進(jìn)程執(zhí)行和第一個(gè)子進(jìn)程差不多的操作,唯一的區(qū)別就是要將oldfds里面的寫端全部斷開,然后以此類推
2、細(xì)節(jié)處理
開始創(chuàng)建第一個(gè)子進(jìn)程并形成管道時(shí),父進(jìn)程的讀端fd==3
寫端fd==4
,到后面就會(huì)關(guān)閉讀端,第二次創(chuàng)建時(shí)父進(jìn)程的讀端fd==3
寫端fd==5
,以此類推,父進(jìn)程的讀端將一直為3,而寫端遞增
創(chuàng)建完成的子進(jìn)程在父進(jìn)程發(fā)送信息之前都處于阻塞狀態(tài),一旦父進(jìn)程發(fā)送信息,比如說(shuō)上面我們提到的指定某個(gè)管道或者指定某個(gè)任務(wù)
3、標(biāo)準(zhǔn)的制定
一種良好的編程習(xí)慣對(duì)于一個(gè)程序員來(lái)說(shuō)是一件非常好的事情,對(duì)于我們main函數(shù)中的這三個(gè)函數(shù)參數(shù),我們發(fā)現(xiàn)它們遵守著一定的規(guī)則
const &
:當(dāng)我們只進(jìn)行輸入不要輸出內(nèi)容的時(shí)候*
:當(dāng)我們要輸出內(nèi)容的時(shí)候,類似于輸出型參數(shù)&
:當(dāng)我們既要輸入又要輸出的時(shí)候
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
centos7 esxi6.7模板實(shí)際應(yīng)用詳解
這篇文章主要介紹了centos7 esxi6.7模板實(shí)際應(yīng)用詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01在Apache服務(wù)器上利用Varnish優(yōu)化移動(dòng)端訪問(wèn)的方法
這篇文章主要介紹了在Apach服務(wù)器上利用Varnish優(yōu)化移動(dòng)端訪問(wèn)的方法,包括清除緩存等常用操作的介紹,需要的朋友可以參考下2015-06-06linux服務(wù)器上安裝Anaconda與pytorch的詳細(xì)過(guò)程
這篇文章主要介紹了linux服務(wù)器上安裝Anaconda與pytorch的詳細(xì)過(guò)程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-10-10Apache FlinkCEP 實(shí)現(xiàn)超時(shí)狀態(tài)監(jiān)控的步驟詳解
這篇文章主要介紹了Apache FlinkCEP 實(shí)現(xiàn)超時(shí)狀態(tài)監(jiān)控的步驟,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-10-10apache啟動(dòng)報(bào)錯(cuò):httpd: apr_sockaddr_info_get() failed
httpd: Could not reliably determine the server's fully qualified domain name, using 127.0.0.1 for ServerName2013-02-02