c++版線(xiàn)程池和任務(wù)池示例
commondef.h
//單位秒,監(jiān)測(cè)空閑列表時(shí)間間隔,在空閑隊(duì)列中超過(guò)TASK_DESTROY_INTERVAL時(shí)間的任務(wù)將被自動(dòng)銷(xiāo)毀
const int CHECK_IDLE_TASK_INTERVAL = 300;
//單位秒,任務(wù)自動(dòng)銷(xiāo)毀時(shí)間間隔
const int TASK_DESTROY_INTERVAL = 60;
//監(jiān)控線(xiàn)程池是否為空時(shí)間間隔,微秒
const int IDLE_CHECK_POLL_EMPTY = 500;
//線(xiàn)程池線(xiàn)程空閑自動(dòng)退出時(shí)間間隔 ,5分鐘
const int THREAD_WAIT_TIME_OUT = 300;
taskpool.cpp
#include "taskpool.h"
#include <string.h>
#include <stdio.h>
#include <pthread.h>
TaskPool::TaskPool(const int & poolMaxSize)
: m_poolSize(poolMaxSize)
, m_taskListSize(0)
, m_bStop(false)
{
pthread_mutex_init(&m_lock, NULL);
pthread_mutex_init(&m_idleMutex, NULL);
pthread_cond_init(&m_idleCond, NULL);
pthread_attr_t attr;
pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 讓線(xiàn)程獨(dú)立運(yùn)行
pthread_create(&m_idleId, &attr, CheckIdleTask, this); //創(chuàng)建監(jiān)測(cè)空閑任務(wù)進(jìn)程
pthread_attr_destroy(&attr);
}
TaskPool::~TaskPool()
{
if(!m_bStop)
{
StopPool();
}
if(!m_taskList.empty())
{
std::list<Task*>::iterator it = m_taskList.begin();
for(; it != m_taskList.end(); ++it)
{
if(*it != NULL)
{
delete *it;
*it = NULL;
}
}
m_taskList.clear();
m_taskListSize = 0;
}
if(!m_idleList.empty())
{
std::list<Task*>::iterator it = m_idleList.begin();
for(; it != m_idleList.end(); ++it)
{
if(*it != NULL)
{
delete *it;
*it = NULL;
}
}
m_idleList.clear();
}
pthread_mutex_destroy(&m_lock);
pthread_mutex_destroy(&m_idleMutex);
pthread_cond_destroy(&m_idleCond);
}
void * TaskPool::CheckIdleTask(void * arg)
{
TaskPool * pool = (TaskPool*)arg;
while(1)
{
pool->LockIdle();
pool->RemoveIdleTask();
if(pool->GetStop())
{
pool->UnlockIdle();
break;
}
pool->CheckIdleWait();
pool->UnlockIdle();
}
}
void TaskPool::StopPool()
{
m_bStop = true;
LockIdle();
pthread_cond_signal(&m_idleCond); //防止監(jiān)控線(xiàn)程正在等待,而引起無(wú)法退出的問(wèn)題
UnlockIdle();
pthread_join(m_idleId, NULL);
}
bool TaskPool::GetStop()
{
return m_bStop;
}
void TaskPool::CheckIdleWait()
{
struct timespec timeout;
memset(&timeout, 0, sizeof(timeout));
timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
timeout.tv_nsec = 0;
pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}
int TaskPool::RemoveIdleTask()
{
int iRet = 0;
std::list<Task*>::iterator it, next;
std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();
time_t curTime = time(0);
for(; rit != m_idleList.rend(); )
{
it = --rit.base();
if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
{
iRet++;
delete *it;
*it = NULL;
next = m_idleList.erase(it);
rit = std::list<Task*>::reverse_iterator(next);
}
else
{
break;
}
}
}
int TaskPool::AddTask(task_fun fun, void *arg)
{
int iRet = 0;
if(0 != fun)
{
pthread_mutex_lock(&m_lock);
if(m_taskListSize >= m_poolSize)
{
pthread_mutex_unlock(&m_lock);
iRet = -1; //task pool is full;
}
else
{
pthread_mutex_unlock(&m_lock);
Task * task = GetIdleTask();
if(NULL == task)
{
task = new Task;
}
if(NULL == task)
{
iRet = -2; // new failed
}
else
{
task->fun = fun;
task->data = arg;
pthread_mutex_lock(&m_lock);
m_taskList.push_back(task);
++m_taskListSize;
pthread_mutex_unlock(&m_lock);
}
}
}
return iRet;
}
Task* TaskPool::GetTask()
{
Task *task = NULL;
pthread_mutex_lock(&m_lock);
if(!m_taskList.empty())
{
task = m_taskList.front();
m_taskList.pop_front();
--m_taskListSize;
}
pthread_mutex_unlock(&m_lock);
return task;
}
void TaskPool::LockIdle()
{
pthread_mutex_lock(&m_idleMutex);
}
void TaskPool::UnlockIdle()
{
pthread_mutex_unlock(&m_idleMutex);
}
Task * TaskPool::GetIdleTask()
{
LockIdle();
Task * task = NULL;
if(!m_idleList.empty())
{
task = m_idleList.front();
m_idleList.pop_front();
}
UnlockIdle();
return task;
}
void TaskPool::SaveIdleTask(Task*task)
{
if(NULL != task)
{
task->fun = 0;
task->data = NULL;
task->last_time = time(0);
LockIdle();
m_idleList.push_front(task);
UnlockIdle();
}
}
taskpool.h
#ifndef TASKPOOL_H
#define TASKPOOL_H
/* purpose @ 任務(wù)池,主要是緩沖外部高并發(fā)任務(wù)數(shù),有manager負(fù)責(zé)調(diào)度任務(wù)
* 任務(wù)池可自動(dòng)銷(xiāo)毀長(zhǎng)時(shí)間空閑的Task對(duì)象
* 可通過(guò)CHECK_IDLE_TASK_INTERVAL設(shè)置檢查idle空閑進(jìn)程輪訓(xùn)等待時(shí)間
* TASK_DESTROY_INTERVAL 設(shè)置Task空閑時(shí)間,超過(guò)這個(gè)時(shí)間值將會(huì)被CheckIdleTask線(xiàn)程銷(xiāo)毀
* date @ 2013.12.23
* author @ haibin.wang
*/
#include <list>
#include <pthread.h>
#include "commondef.h"
//所有的用戶(hù)操作為一個(gè)task,
typedef void (*task_fun)(void *);
struct Task
{
task_fun fun; //任務(wù)處理函數(shù)
void* data; //任務(wù)處理數(shù)據(jù)
time_t last_time; //加入空閑隊(duì)列的時(shí)間,用于自動(dòng)銷(xiāo)毀
};
//任務(wù)池,所有任務(wù)會(huì)投遞到任務(wù)池中,管理線(xiàn)程負(fù)責(zé)將任務(wù)投遞給線(xiàn)程池
class TaskPool
{
public:
/* pur @ 初始化任務(wù)池,啟動(dòng)任務(wù)池空閑隊(duì)列自動(dòng)銷(xiāo)毀線(xiàn)程
* para @ maxSize 最大任務(wù)數(shù),大于0
*/
TaskPool(const int & poolMaxSize);
~TaskPool();
/* pur @ 添加任務(wù)到任務(wù)隊(duì)列的尾部
* para @ task, 具體任務(wù)
* return @ 0 添加成功,負(fù)數(shù) 添加失敗
*/
int AddTask(task_fun fun, void* arg);
/* pur @ 從任務(wù)列表的頭獲取一個(gè)任務(wù)
* return @ 如果列表中有任務(wù)則返回一個(gè)Task指針,否則返回一個(gè)NULL
*/
Task* GetTask();
/* pur @ 保存空閑任務(wù)到空閑隊(duì)列中
* para @ task 已被調(diào)用執(zhí)行的任務(wù)
* return @
*/
void SaveIdleTask(Task*task);
void StopPool();
public:
void LockIdle();
void UnlockIdle();
void CheckIdleWait();
int RemoveIdleTask();
bool GetStop();
private:
static void * CheckIdleTask(void *);
/* pur @ 獲取空閑的task
* para @
* para @
* return @ NULL說(shuō)明沒(méi)有空閑的,否則從m_idleList中獲取一個(gè)
*/
Task* GetIdleTask();
int GetTaskSize();
private:
int m_poolSize; //任務(wù)池大小
int m_taskListSize; // 統(tǒng)計(jì)taskList的大小,因?yàn)楫?dāng)List的大小會(huì)隨著數(shù)量的增多而耗時(shí)增加
bool m_bStop; //是否停止
std::list<Task*> m_taskList;//所有待處理任務(wù)列表
std::list<Task*> m_idleList;//所有空閑任務(wù)列表
pthread_mutex_t m_lock; //對(duì)任務(wù)列表進(jìn)行加鎖,保證每次只能取一個(gè)任務(wù)
pthread_mutex_t m_idleMutex; //空閑任務(wù)隊(duì)列鎖
pthread_cond_t m_idleCond; //空閑隊(duì)列等待條件
pthread_t m_idleId;;
};
#endif
threadpool.cpp
/* purpose @ 線(xiàn)程池類(lèi),負(fù)責(zé)線(xiàn)程的創(chuàng)建與銷(xiāo)毀,實(shí)現(xiàn)線(xiàn)程超時(shí)自動(dòng)退出功能(半駐留)
* date @ 2014.01.03
* author @ haibin.wang
*/
#include "threadpool.h"
#include <errno.h>
#include <string.h>
/*
#include <iostream>
#include <stdio.h>
*/
Thread::Thread(bool detach, ThreadPool * pool)
: m_pool(pool)
{
pthread_attr_init(&m_attr);
if(detach)
{
pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 讓線(xiàn)程獨(dú)立運(yùn)行
}
else
{
pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
}
pthread_mutex_init(&m_mutex, NULL); //初始化互斥量
pthread_cond_init(&m_cond, NULL); //初始化條件變量
task.fun = 0;
task.data = NULL;
}
Thread::~Thread()
{
pthread_cond_destroy(&m_cond);
pthread_mutex_destroy(&m_mutex);
pthread_attr_destroy(&m_attr);
}
ThreadPool::ThreadPool()
: m_poolMax(0)
, m_idleNum(0)
, m_totalNum(0)
, m_bStop(false)
{
pthread_mutex_init(&m_mutex, NULL);
pthread_mutex_init(&m_runMutex,NULL);
pthread_mutex_init(&m_terminalMutex, NULL);
pthread_cond_init(&m_terminalCond, NULL);
pthread_cond_init(&m_emptyCond, NULL);
}
ThreadPool::~ThreadPool()
{
/*if(!m_threads.empty())
{
std::list<Thread*>::iterator it = m_threads.begin();
for(; it != m_threads.end(); ++it)
{
if(*it != NULL)
{
pthread_cond_destroy( &((*it)->m_cond) );
pthread_mutex_destroy( &((*it)->m_mutex) );
delete *it;
*it = NULL;
}
}
m_threads.clear();
}*/
pthread_mutex_destroy(&m_runMutex);
pthread_mutex_destroy(&m_terminalMutex);
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_terminalCond);
pthread_cond_destroy(&m_emptyCond);
}
int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
if(poolMax < poolPre
|| poolPre < 0
|| poolMax <= 0)
{
return -1;
}
m_poolMax = poolMax;
int iRet = 0;
for(int i=0; i<poolPre; ++i)
{
Thread * thread = CreateThread();
if(NULL == thread)
{
iRet = -2;
}
}
if(iRet < 0)
{
std::list<Thread*>::iterator it = m_threads.begin();
for(; it!= m_threads.end(); ++it)
{
if(NULL != (*it) )
{
delete *it;
*it = NULL;
}
}
m_threads.clear();
m_totalNum = 0;
}
return iRet;
}
void ThreadPool::GetThreadRun(task_fun fun, void* arg)
{
//從線(xiàn)程池中獲取一個(gè)線(xiàn)程
pthread_mutex_lock( &m_mutex);
if(m_threads.empty())
{
pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空閑線(xiàn)程
}
Thread * thread = m_threads.front();
m_threads.pop_front();
pthread_mutex_unlock( &m_mutex);
pthread_mutex_lock( &thread->m_mutex );
thread->task.fun = fun;
thread->task.data = arg;
pthread_cond_signal(&thread->m_cond); //觸發(fā)線(xiàn)程WapperFun循環(huán)執(zhí)行
pthread_mutex_unlock( &thread->m_mutex );
}
int ThreadPool::Run(task_fun fun, void * arg)
{
pthread_mutex_lock(&m_runMutex); //保證每次只能由一個(gè)線(xiàn)程執(zhí)行
int iRet = 0;
if(m_totalNum <m_poolMax) //
{
if(m_threads.empty() && (NULL == CreateThread()) )
{
iRet = -1;//can not create new thread!
}
else
{
GetThreadRun(fun, arg);
}
}
else
{
GetThreadRun(fun, arg);
}
pthread_mutex_unlock(&m_runMutex);
return iRet;
}
void ThreadPool::StopPool(bool bStop)
{
m_bStop = bStop;
if(bStop)
{
//啟動(dòng)監(jiān)控所有空閑線(xiàn)程是否退出的線(xiàn)程
Thread thread(false, this);
pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //啟動(dòng)監(jiān)控所有線(xiàn)程退出線(xiàn)程
//阻塞等待所有空閑線(xiàn)程退出
pthread_join(thread.m_threadId, NULL);
}
/*if(bStop)
{
pthread_mutex_lock(&m_terminalMutex);
//啟動(dòng)監(jiān)控所有空閑線(xiàn)程是否退出的線(xiàn)程
Thread thread(true, this);
pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //啟動(dòng)監(jiān)控所有線(xiàn)程退出線(xiàn)程
//阻塞等待所有空閑線(xiàn)程退出
pthread_cond_wait(&m_terminalCond, & m_terminalMutex);
pthread_mutex_unlock(&m_terminalMutex);
}*/
}
bool ThreadPool::GetStop()
{
return m_bStop;
}
Thread * ThreadPool::CreateThread()
{
Thread * thread = NULL;
thread = new Thread(true, this);
if(NULL != thread)
{
int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通過(guò)WapperFun將線(xiàn)程加入到空閑隊(duì)列中
if(0 != iret)
{
delete thread;
thread = NULL;
}
}
return thread;
}
void * ThreadPool::WapperFun(void*arg)
{
Thread * thread = (Thread*)arg;
if(NULL == thread || NULL == thread->m_pool)
{
return NULL;
}
ThreadPool * pool = thread->m_pool;
pool->IncreaseTotalNum();
struct timespec abstime;
memset(&abstime, 0, sizeof(abstime));
while(1)
{
if(0 != thread->task.fun)
{
thread->task.fun(thread->task.data);
}
if( true == pool->GetStop() )
{
break; //確定當(dāng)前任務(wù)執(zhí)行完畢后再判定是否退出線(xiàn)程
}
pthread_mutex_lock( &thread->m_mutex );
pool->SaveIdleThread(thread); //將線(xiàn)程加入到空閑隊(duì)列中
abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
abstime.tv_nsec = 0;
if(ETIMEDOUT == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待線(xiàn)程被喚醒 或超時(shí)自動(dòng)退出
{
pthread_mutex_unlock( &thread->m_mutex );
break;
}
pthread_mutex_unlock( &thread->m_mutex );
}
pool->LockMutex();
pool->DecreaseTotalNum();
if(thread != NULL)
{
pool->RemoveThread(thread);
delete thread;
thread = NULL;
}
pool->UnlockMutex();
return 0;
}
void ThreadPool::SaveIdleThread(Thread * thread )
{
if(thread)
{
thread->task.fun = 0;
thread->task.data = NULL;
LockMutex();
if(m_threads.empty())
{
pthread_cond_broadcast(&m_emptyCond); //發(fā)送不空的信號(hào),告訴run函數(shù)線(xiàn)程隊(duì)列已經(jīng)不空了
}
m_threads.push_front(thread);
UnlockMutex();
}
}
int ThreadPool::TotalThreads()
{
return m_totalNum;
}
void ThreadPool::SendSignal()
{
LockMutex();
std::list<Thread*>::iterator it = m_threads.begin();
for(; it!= m_threads.end(); ++it)
{
pthread_mutex_lock( &(*it)->m_mutex );
pthread_cond_signal(&((*it)->m_cond));
pthread_mutex_unlock( &(*it)->m_mutex );
}
UnlockMutex();
}
void * ThreadPool::TerminalCheck(void* arg)
{
Thread * thread = (Thread*)arg;
if(NULL == thread || NULL == thread->m_pool)
{
return NULL;
}
ThreadPool * pool = thread->m_pool;
while((false == pool->GetStop()) || pool->TotalThreads() >0 )
{
pool->SendSignal();
usleep(IDLE_CHECK_POLL_EMPTY);
}
//pool->TerminalCondSignal();
return 0;
}
void ThreadPool::TerminalCondSignal()
{
pthread_cond_signal(&m_terminalCond);
}
void ThreadPool::RemoveThread(Thread* thread)
{
m_threads.remove(thread);
}
void ThreadPool::LockMutex()
{
pthread_mutex_lock( &m_mutex);
}
void ThreadPool::UnlockMutex()
{
pthread_mutex_unlock( &m_mutex );
}
void ThreadPool::IncreaseTotalNum()
{
LockMutex();
m_totalNum++;
UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
m_totalNum--;
}
threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
/* purpose @ 線(xiàn)程池類(lèi),負(fù)責(zé)線(xiàn)程的創(chuàng)建與銷(xiāo)毀,實(shí)現(xiàn)線(xiàn)程超時(shí)自動(dòng)退出功能(半駐留)a
* 當(dāng)線(xiàn)程池退出時(shí)創(chuàng)建TerminalCheck線(xiàn)程,負(fù)責(zé)監(jiān)測(cè)線(xiàn)程池所有線(xiàn)程退出
* date @ 2013.12.23
* author @ haibin.wang
*/
#include <list>
#include <string>
#include "taskpool.h"
//通過(guò)threadmanager來(lái)控制任務(wù)調(diào)度進(jìn)程
//threadpool的TerminalCheck線(xiàn)程負(fù)責(zé)監(jiān)測(cè)線(xiàn)程池所有線(xiàn)程退出
class ThreadPool;
class Thread
{
public:
Thread(bool detach, ThreadPool * pool);
~Thread();
pthread_t m_threadId; //線(xiàn)程id
pthread_mutex_t m_mutex; //互斥鎖
pthread_cond_t m_cond; //條件變量
pthread_attr_t m_attr; //線(xiàn)程屬性
Task task; //
ThreadPool * m_pool; //所屬線(xiàn)程池
};
//線(xiàn)程池,負(fù)責(zé)創(chuàng)建線(xiàn)程處理任務(wù),處理完畢后會(huì)將線(xiàn)程加入到空閑隊(duì)列中,從任務(wù)池中
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();
/* pur @ 初始化線(xiàn)程池
* para @ poolMax 線(xiàn)程池最大線(xiàn)程數(shù)
* para @ poolPre 預(yù)創(chuàng)建線(xiàn)程數(shù)
* return @ 0:成功
* -1: parameter error, must poolMax > poolPre >=0
* -2: 創(chuàng)建線(xiàn)程失敗
*/
int InitPool(const int & poolMax, const int & poolPre);
/* pur @ 執(zhí)行一個(gè)任務(wù)
* para @ task 任務(wù)指針
* return @ 0任務(wù)分配成功,負(fù)值 任務(wù)分配失敗,-1,創(chuàng)建新線(xiàn)程失敗
*/
int Run(task_fun fun, void* arg);
/* pur @ 設(shè)置是否停止線(xiàn)程池工作
* para @ bStop true停止,false不停止
*/
void StopPool(bool bStop);
public: //此公有函數(shù)主要用于靜態(tài)函數(shù)調(diào)用
/* pur @ 獲取進(jìn)程池的啟停狀態(tài)
* return @
*/
bool GetStop();
void SaveIdleThread(Thread * thread );
void LockMutex();
void UnlockMutex();
void DecreaseTotalNum();
void IncreaseTotalNum();
void RemoveThread(Thread* thread);
void TerminalCondSignal();
int TotalThreads();
void SendSignal();
private:
/* pur @ 創(chuàng)建線(xiàn)程
* return @ 非空 成功,NULL失敗,
*/
Thread * CreateThread();
/* pur @ 從線(xiàn)程池中獲取一個(gè)一個(gè)線(xiàn)程運(yùn)行任務(wù)
* para @ fun 函數(shù)指針
* para @ arg 函數(shù)參數(shù)
* return @
*/
void GetThreadRun(task_fun fun, void* arg);
static void * WapperFun(void*);
static void * TerminalCheck(void*);//循環(huán)監(jiān)測(cè)是否所有線(xiàn)程終止線(xiàn)程
private:
int m_poolMax;//線(xiàn)程池最大線(xiàn)程數(shù)
int m_idleNum; //空閑線(xiàn)程數(shù)
int m_totalNum; //當(dāng)前線(xiàn)程總數(shù) 小于最大線(xiàn)程數(shù)
bool m_bStop; //是否停止線(xiàn)程池
pthread_mutex_t m_mutex; //線(xiàn)程列表鎖
pthread_mutex_t m_runMutex; //run函數(shù)鎖
pthread_mutex_t m_terminalMutex; //終止所有線(xiàn)程互斥量
pthread_cond_t m_terminalCond; //終止所有線(xiàn)程條件變量
pthread_cond_t m_emptyCond; //空閑線(xiàn)程不空條件變量
std::list<Thread*> m_threads; // 線(xiàn)程列表
};
#endif
threadpoolmanager.cpp
#include "threadpoolmanager.h"
#include "threadpool.h"
#include "taskpool.h"
#include <errno.h>
#include <string.h>
/*#include <string.h>
#include <sys/time.h>
#include <stdio.h>*/
// struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager()
: m_threadPool(NULL)
, m_taskPool(NULL)
, m_bStop(false)
{
pthread_mutex_init(&m_mutex_task,NULL);
pthread_cond_init(&m_cond_task, NULL);
/* memset(&time_beg, 0, sizeof(struct timeval));
memset(&time_end, 0, sizeof(struct timeval));
gettimeofday(&time_beg, NULL);*/
}
ThreadPoolManager::~ThreadPoolManager()
{
StopAll();
if(NULL != m_threadPool)
{
delete m_threadPool;
m_threadPool = NULL;
}
if(NULL != m_taskPool)
{
delete m_taskPool;
m_taskPool = NULL;
}
pthread_cond_destroy( &m_cond_task);
pthread_mutex_destroy( &m_mutex_task );
/*gettimeofday(&time_end, NULL);
long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
printf("manager total time = %d\n", total);
gettimeofday(&time_beg, NULL);*/
}
int ThreadPoolManager::Init(
const int &tastPoolSize,
const int &threadPoolMax,
const int &threadPoolPre)
{
m_threadPool = new ThreadPool();
if(NULL == m_threadPool)
{
return -1;
}
m_taskPool = new TaskPool(tastPoolSize);
if(NULL == m_taskPool)
{
return -2;
}
if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
{
return -3;
}
//啟動(dòng)線(xiàn)程池
//啟動(dòng)任務(wù)池
//啟動(dòng)任務(wù)獲取線(xiàn)程,從任務(wù)池中不斷拿任務(wù)到線(xiàn)程池中
pthread_attr_t attr;
pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
pthread_create(&m_taskThreadId, &attr, TaskThread, this); //創(chuàng)建獲取任務(wù)進(jìn)程
pthread_attr_destroy(&attr);
return 0;
}
void ThreadPoolManager::StopAll()
{
m_bStop = true;
LockTask();
pthread_cond_signal(&m_cond_task);
UnlockTask();
pthread_join(m_taskThreadId, NULL);
//等待當(dāng)前所有任務(wù)執(zhí)行完畢
m_taskPool->StopPool();
m_threadPool->StopPool(true); // 停止線(xiàn)程池工作
}
void ThreadPoolManager::LockTask()
{
pthread_mutex_lock(&m_mutex_task);
}
void ThreadPoolManager::UnlockTask()
{
pthread_mutex_unlock(&m_mutex_task);
}
void* ThreadPoolManager::TaskThread(void* arg)
{
ThreadPoolManager * manager = (ThreadPoolManager*)arg;
while(1)
{
manager->LockTask(); //防止任務(wù)沒(méi)有執(zhí)行完畢發(fā)送了停止信號(hào)
while(1) //將任務(wù)隊(duì)列中的任務(wù)執(zhí)行完再退出
{
Task * task = manager->GetTaskPool()->GetTask();
if(NULL == task)
{
break;
}
else
{
manager->GetThreadPool()->Run(task->fun, task->data);
manager->GetTaskPool()->SaveIdleTask(task);
}
}
if(manager->GetStop())
{
manager->UnlockTask();
break;
}
manager->TaskCondWait(); //等待有任務(wù)的時(shí)候執(zhí)行
manager->UnlockTask();
}
return 0;
}
ThreadPool * ThreadPoolManager::GetThreadPool()
{
return m_threadPool;
}
TaskPool * ThreadPoolManager::GetTaskPool()
{
return m_taskPool;
}
int ThreadPoolManager::Run(task_fun fun,void* arg)
{
if(0 == fun)
{
return 0;
}
if(!m_bStop)
{
int iRet = m_taskPool->AddTask(fun, arg);
if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )
{
pthread_cond_signal(&m_cond_task);
UnlockTask();
}
return iRet;
}
else
{
return -3;
}
}
bool ThreadPoolManager::GetStop()
{
return m_bStop;
}
void ThreadPoolManager::TaskCondWait()
{
struct timespec to;
memset(&to, 0, sizeof to);
to.tv_sec = time(0) + 60;
to.tv_nsec = 0;
pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超時(shí)
}
threadpoolmanager.h
#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
* 基本流程:
* 管理線(xiàn)程池和任務(wù)池,先將任務(wù)加入任務(wù)池,然后由TaskThread負(fù)責(zé)從任務(wù)池中將任務(wù)取出放入到線(xiàn)程池中
* 基本功能:
* 1、工作線(xiàn)程可以在業(yè)務(wù)不忙的時(shí)候自動(dòng)退出部分長(zhǎng)時(shí)間不使用的線(xiàn)程
* 2、任務(wù)池可以在業(yè)務(wù)不忙的時(shí)候自動(dòng)釋放長(zhǎng)時(shí)間不使用的資源(可通過(guò)commondef.h修改)
* 3、當(dāng)程序退時(shí)不再向任務(wù)池中添加任務(wù),當(dāng)任務(wù)池中所有任務(wù)執(zhí)行完畢后才退出相關(guān)程序(做到程序的安全退出)
* 線(xiàn)程資源:
* 如果不預(yù)分配任何處理線(xiàn)程的話(huà),ThreadPool只有當(dāng)有任務(wù)的時(shí)候才實(shí)際創(chuàng)建需要的線(xiàn)程,最大線(xiàn)程創(chuàng)建數(shù)為用戶(hù)指定
* 當(dāng)manager銷(xiāo)毀的時(shí)候,manager會(huì)創(chuàng)建一個(gè)監(jiān)控所有任務(wù)執(zhí)行完畢的監(jiān)控線(xiàn)程,只有當(dāng)所有任務(wù)執(zhí)行完畢后manager才銷(xiāo)毀
* 線(xiàn)程最大數(shù)為:1個(gè)TaskPool線(xiàn)程 + 1個(gè)manager任務(wù)調(diào)度線(xiàn)程 + ThreadPool最大線(xiàn)程數(shù) + 1個(gè)manager退出監(jiān)控線(xiàn)程 + 1線(xiàn)程池所有線(xiàn)程退出監(jiān)控線(xiàn)程
* 線(xiàn)程最小數(shù)為:1個(gè)TaskPool創(chuàng)建空閑任務(wù)資源銷(xiāo)毀監(jiān)控線(xiàn)程 + 1個(gè)manager創(chuàng)建任務(wù)調(diào)度線(xiàn)程
* 使用方法:
* ThreadPoolManager manager;
* manager.Init(100000, 50, 5);//初始化一個(gè)任務(wù)池為10000,線(xiàn)程池最大線(xiàn)程數(shù)50,預(yù)創(chuàng)建5個(gè)線(xiàn)程的管理器
* manager.run(fun, data); //添加執(zhí)行任務(wù)到manager中,fun為函數(shù)指針,data為fun需要傳入的參數(shù),data可以為NULL
*
* date @ 2013.12.23
* author @ haibin.wang
*
* 詳細(xì)參數(shù)控制可以修改commondef.h中的相關(guān)變量值
*/
#include <pthread.h>
typedef void (*task_fun)(void *);
class ThreadPool;
class TaskPool;
class ThreadPoolManager
{
public:
ThreadPoolManager();
~ThreadPoolManager();
/* pur @ 初始化線(xiàn)程池與任務(wù)池,threadPoolMax > threadPoolPre > threadPoolMin >= 0
* para @ tastPoolSize 任務(wù)池大小
* para @ threadPoolMax 線(xiàn)程池最大線(xiàn)程數(shù)
* para @ threadPoolPre 預(yù)創(chuàng)建線(xiàn)程數(shù)
* return @ 0:初始化成功,負(fù)數(shù) 初始化失敗
* -1:創(chuàng)建線(xiàn)程池失敗
* -2:創(chuàng)建任務(wù)池失敗
* -3:線(xiàn)程池初始化失敗
*/
int Init(const int &tastPoolSize,
const int &threadPoolMax,
const int &threadPoolPre);
/* pur @ 執(zhí)行一個(gè)任務(wù)
* para @ fun 需要執(zhí)行的函數(shù)指針
* para @ arg fun需要的參數(shù),默認(rèn)為NULL
* return @ 0 任務(wù)分配成功,負(fù)數(shù) 任務(wù)分配失敗
* -1:任務(wù)池滿(mǎn)
* -2:任務(wù)池new失敗
* -3:manager已經(jīng)發(fā)送停止信號(hào),不再接收新任務(wù)
*/
int Run(task_fun fun,void* arg=NULL);
public: //以下public函數(shù)主要用于靜態(tài)函數(shù)調(diào)用
bool GetStop();
void TaskCondWait();
TaskPool * GetTaskPool();
ThreadPool * GetThreadPool();
void LockTask();
void UnlockTask();
void LockFull();
private:
static void * TaskThread(void*); //任務(wù)處理線(xiàn)程
void StopAll();
private:
ThreadPool *m_threadPool; //線(xiàn)程池
TaskPool * m_taskPool; //任務(wù)池
bool m_bStop; // 是否終止管理器
pthread_t m_taskThreadId; // TaskThread線(xiàn)程id
pthread_mutex_t m_mutex_task;
pthread_cond_t m_cond_task;
};
#endif
main.cpp
#include <iostream>
#include <string>
#include "threadpoolmanager.h"
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
pthread_mutex_lock(&m_mutex);
seq++;
if(seq%inter == 0 )
{
cout << "fun 1=" << seq << endl;
}
if(seq>=1000000000)
{
cout << "billion" << endl;
seq = 0;
billNum++;
}
pthread_mutex_unlock(&m_mutex);
//sleep();
}
int main(int argc, char** argv)
{
if(argc != 6)
{
cout << "必須有5個(gè)參數(shù) 任務(wù)執(zhí)行次數(shù) 任務(wù)池大小 線(xiàn)程池大小 預(yù)創(chuàng)建線(xiàn)程數(shù) 輸出間隔" << endl;
cout << "eg: ./test 999999 10000 100 10 20" << endl;
cout << "上例代表創(chuàng)建一個(gè)間隔20個(gè)任務(wù)輸出,任務(wù)池大小為10000,線(xiàn)程池大小為100,預(yù)創(chuàng)建10個(gè)線(xiàn)程,執(zhí)行任務(wù)次數(shù)為:999999" << endl;
return 0;
}
double loopSize = atof(argv[1]);
int taskSize = atoi(argv[2]);
int threadPoolSize = atoi(argv[3]);
int preSize = atoi(argv[4]);
inter = atoi(argv[5]);
pthread_mutex_init(&m_mutex,NULL);
ThreadPoolManager manager;
if(0>manager.Init(taskSize, threadPoolSize, preSize))
{
cout << "初始化失敗" << endl;
return 0;
}
cout << "*******************初始化完成*********************" << endl;
struct timeval time_beg, time_end;
memset(&time_beg, 0, sizeof(struct timeval));
memset(&time_end, 0, sizeof(struct timeval));
gettimeofday(&time_beg, NULL);
double i=0;
for(; i<loopSize; ++i)
{
while(0>manager.Run(myFunc,NULL))
{
usleep(100);
}
}
gettimeofday(&time_end, NULL);
long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
cout << "total time =" << total << endl;
cout << "total num =" << i << " billion num=" << billNum<< endl;
cout << __FILE__ << "將關(guān)閉所有線(xiàn)程" << endl;
//pthread_mutex_destroy(&m_mutex);
return 0;
}
相關(guān)文章
隊(duì)列的動(dòng)態(tài)鏈?zhǔn)酱鎯?chǔ)實(shí)現(xiàn)代碼分享
DynaLnkQueue.cpp - 動(dòng)態(tài)鏈?zhǔn)疥?duì)列,即隊(duì)列的動(dòng)態(tài)鏈?zhǔn)酱鎯?chǔ)實(shí)現(xiàn)2014-02-02深入c語(yǔ)言continue和break的區(qū)別詳解
本篇文章是對(duì)c語(yǔ)言中continue和break的區(qū)別進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05C語(yǔ)言實(shí)現(xiàn)投票系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了C語(yǔ)言實(shí)現(xiàn)投票系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-07-07Ubuntu 20.04 下安裝配置 VScode 的 C/C++ 開(kāi)發(fā)環(huán)境(圖文教程)
這篇文章主要介紹了Ubuntu 20.04 下安裝配置 VScode 的 C/C++ 開(kāi)發(fā)環(huán)境,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05C/C++連接MySQL數(shù)據(jù)庫(kù)詳細(xì)圖文教程
在實(shí)際開(kāi)發(fā)中我們經(jīng)常需要對(duì)數(shù)據(jù)庫(kù)進(jìn)行訪(fǎng)問(wèn),下面這篇文章主要介紹了C/C++連接MySQL數(shù)據(jù)庫(kù)的詳細(xì)圖文教程,文中通過(guò)代碼以及圖文介紹是非常詳細(xì),需要的朋友可以參考下2024-01-01C語(yǔ)言實(shí)現(xiàn)掃雷小游戲的全過(guò)程記錄
這篇文章主要給大家介紹了關(guān)于C語(yǔ)言實(shí)現(xiàn)掃雷小游戲的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04C語(yǔ)言鏈表實(shí)現(xiàn)簡(jiǎn)易通訊錄
這篇文章主要為大家詳細(xì)介紹了C語(yǔ)言鏈表實(shí)現(xiàn)簡(jiǎn)易通訊錄,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-05-05