C語(yǔ)言實(shí)現(xiàn)一個(gè)多線程委托模型的示例詳解
C語(yǔ)言實(shí)現(xiàn)一個(gè)多線程委托模型
多線程委托模型將線程分為boss線程(主線程)和worker線程(工作線程)。先從一個(gè)主線程開始運(yùn)行,主線程根據(jù)情況完成工作線程的創(chuàng)建,將創(chuàng)建好的工作線程放入隊(duì)列中,有工作時(shí),主線程喚醒工作參與工作。如果工作線程產(chǎn)生異常,主線程可以關(guān)閉工作線程并開啟新的工作線程。
以下是使用C語(yǔ)言實(shí)現(xiàn)多線程委托模型的代碼,其中包含boss線程和worker線程,boss線程用于創(chuàng)建worker線程并將其放入工作隊(duì)列中,有任務(wù)時(shí)喚醒worker線程:
#include <pthread.h> #include <stdio.h> #include <stdlib.h> typedef struct { void *(*task)(void *arg); void *arg; } Task; Task *task_create(void *(*task)(void *arg), void *arg); void task_destroy(Task *task); typedef struct { int thread_count; int task_count; int head; int tail; Task **tasks; pthread_mutex_t mutex; pthread_cond_t done; } ThreadPool; ThreadPool *threadpool_create(int thread_count, int task_count); void threadpool_destroy(ThreadPool *pool); void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg); Task *threadpool_get_task(ThreadPool *pool); void *worker_thread(void *arg); Task *task_create(void *(*task)(void *arg), void *arg) { Task *t = (Task*) malloc(sizeof(Task)); t->task = task; t->arg = arg; return t; } void task_destroy(Task *task) { free(task); } ThreadPool *threadpool_create(int thread_count, int task_count) { ThreadPool *pool = (ThreadPool*) malloc(sizeof(ThreadPool)); pool->thread_count = thread_count; pool->task_count = task_count; pool->head = pool->tail = 0; pool->tasks = (Task**) malloc(sizeof(Task*) * task_count); pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->done, NULL); int i; for (i = 0; i < pool->thread_count; i++) { pthread_t thread; pthread_create(&thread, NULL, worker_thread, pool); pthread_detach(thread); } return pool; } void threadpool_destroy(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); int i; for (i = 0; i < pool->tail; i++) { task_destroy(pool->tasks[i]); } free(pool->tasks); free(pool); pthread_mutex_unlock(&pool->mutex); pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->done); } void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg) { pthread_mutex_lock(&pool->mutex); Task *t = task_create(task, arg); if (pool->tail == pool->task_count) { pool->task_count *= 2; pool->tasks = (Task**) realloc(pool->tasks, sizeof(Task*) * pool->task_count); } pool->tasks[pool->tail++] = t; pthread_cond_signal(&pool->done); pthread_mutex_unlock(&pool->mutex); } Task *threadpool_get_task(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); while (pool->head == pool->tail) { pthread_cond_wait(&pool->done, &pool->mutex); } Task *t = pool->tasks[pool->head++]; pthread_mutex_unlock(&pool->mutex); return t; } void *worker_thread(void *arg) { ThreadPool *pool = (ThreadPool*) arg; for (;;) { Task *t = threadpool_get_task(pool); (*(t->task))(t->arg); task_destroy(t); } return NULL; } void * boss_task(void *arg) { ThreadPool *pool = (ThreadPool*) arg; // 在boss線程中添加任務(wù) int i; for (i = 0; i < 10; i++) { threadpool_add_task(pool, worker_task, NULL); } return NULL; } void * worker_task(void *arg) { printf("Worker thread running\n"); return NULL; } int main(int argc, char *argv[]) { ThreadPool *pool = threadpool_create(4, 10); threadpool_add_task(pool, boss_task, pool); sleep(10); threadpool_destroy(pool); return 0; }
在這個(gè)示例中,我們定義了一個(gè)ThreadPool
結(jié)構(gòu)體,其中包括一個(gè)任務(wù)數(shù)組、一個(gè)鎖和一個(gè)條件變量。worker_thread
函數(shù)是用于執(zhí)行任務(wù)的線程函數(shù),而threadpool_create
、threadpool_add_task
和threadpool_get_task
函數(shù)用于創(chuàng)建、管理和調(diào)度任務(wù)。
在main
函數(shù)中,我們創(chuàng)建了一個(gè)包含4個(gè)線程、最大任務(wù)數(shù)量為10的線程池,并在其中添加了一個(gè)boss線程,用于向線程池中添加worker線程任務(wù)。在每個(gè)worker任務(wù)中,我們只輸出一條消息,表示線程正在運(yùn)行。
這就是一個(gè)使用C語(yǔ)言實(shí)現(xiàn)的多線程委托模型,其中包含了boss線程和worker線程。在實(shí)際使用時(shí),應(yīng)根據(jù)具體應(yīng)用場(chǎng)景進(jìn)行更進(jìn)一步的修改和擴(kuò)展。
如果工作線程產(chǎn)生異常,主線程可以關(guān)閉工作線程并開啟新的工作線程
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <unistd.h> #include <errno.h> typedef struct { void *(*task)(void *arg); void *arg; } Task; Task *task_create(void *(*task)(void *arg), void *arg); void task_destroy(Task *task); typedef struct { int thread_count; int task_count; int head; int tail; Task **tasks; pthread_mutex_t mutex; pthread_cond_t done; } ThreadPool; ThreadPool *threadpool_create(int thread_count, int task_count); void threadpool_destroy(ThreadPool *pool); void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg); Task *threadpool_get_task(ThreadPool *pool); void *worker_thread(void *arg); Task *task_create(void *(*task)(void *arg), void *arg) { Task *t = (Task*) malloc(sizeof(Task)); t->task = task; t->arg = arg; return t; } void task_destroy(Task *task) { free(task); } ThreadPool *threadpool_create(int thread_count, int task_count) { ThreadPool *pool = (ThreadPool*) malloc(sizeof(ThreadPool)); pool->thread_count = thread_count; pool->task_count = task_count; pool->head = pool->tail = 0; pool->tasks = (Task**) malloc(sizeof(Task*) * task_count); pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->done, NULL); int i; for (i = 0; i < pool->thread_count; i++) { pthread_t thread; pthread_create(&thread, NULL, worker_thread, pool); pthread_detach(thread); } return pool; } void threadpool_destroy(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); int i; for (i = 0; i < pool->tail; i++) { task_destroy(pool->tasks[i]); } free(pool->tasks); free(pool); pthread_mutex_unlock(&pool->mutex); pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->done); } void threadpool_add_task(ThreadPool *pool, void *(*task)(void *arg), void *arg) { pthread_mutex_lock(&pool->mutex); Task *t = task_create(task, arg); if (pool->tail == pool->task_count) { pool->task_count *= 2; pool->tasks = (Task**) realloc(pool->tasks, sizeof(Task*) * pool->task_count); } pool->tasks[pool->tail++] = t; pthread_cond_signal(&pool->done); pthread_mutex_unlock(&pool->mutex); } Task *threadpool_get_task(ThreadPool *pool) { pthread_mutex_lock(&pool->mutex); while (pool->head == pool->tail) { pthread_cond_wait(&pool->done, &pool->mutex); } Task *t = pool->tasks[pool->head++]; pthread_mutex_unlock(&pool->mutex); return t; } void *worker_thread(void *arg) { ThreadPool *pool = (ThreadPool*) arg; for (;;) { Task *t = threadpool_get_task(pool); int ret = 0; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); ret = (*(t->task))(t->arg); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); task_destroy(t); if (ret != 0) { pthread_mutex_lock(&pool->mutex); printf("Worker thread exited with error: %d\n", ret); pool->thread_count -= 1; pthread_mutex_unlock(&pool->mutex); pthread_exit(NULL); } } return NULL; } void signal_handler(int signum) { // 忽略這里的信號(hào)處理函數(shù) } void * boss_task(void *arg) { ThreadPool *pool = (ThreadPool*) arg; // 安裝一個(gè)信號(hào)處理函數(shù),方便關(guān)閉工作線程 struct sigaction act; act.sa_handler = signal_handler; sigaction(SIGUSR1, &act, NULL); // 在boss線程中添加任務(wù) int i; for (i = 0; i < 10; i++) { threadpool_add_task(pool, worker_task, NULL); } return NULL; } void * worker_task(void *arg) { int i; for (i = 0; i < 10; i++) { printf("Worker thread running: %d\n", i); sleep(1); // 模擬工作線程異常 if (i == 5) { printf("Worker thread encountered an error\n"); // 發(fā)送信號(hào)關(guān)閉工作線程 pthread_kill(pthread_self(), SIGUSR1); return (void*) 1; } } return NULL; } int main(int argc, char *argv[]) { ThreadPool *pool = threadpool_create(4, 10); threadpool_add_task(pool, boss_task, pool); // 運(yùn)行10秒后退出 sleep(10); // 關(guān)閉所有工作線程 int i; for (i = 0; i < pool->thread_count; i++) { pthread_cancel(0); } threadpool_destroy(pool); return 0; }
在這個(gè)示例中,我們基本上沿用了前面的代碼,只是添加了處理工作線程異常的代碼。我們?cè)趙orker_thread函數(shù)中,通過(guò)調(diào)用pthread_setcancelstate函數(shù)禁止了線程被取消,然后執(zhí)行工作任務(wù),最后恢復(fù)線程的取消狀態(tài)。如果線程執(zhí)行任務(wù)時(shí)出現(xiàn)異常,我們?cè)谥骶€程中通過(guò)發(fā)送信號(hào)SIGUSR1來(lái)關(guān)閉工作線程。
在boss_task中添加的任務(wù)只是簡(jiǎn)單地輸出一條消息,模擬了一些隨機(jī)的操作。這里我們安裝了一個(gè)信號(hào)處理函數(shù),方便在工作線程內(nèi)部發(fā)生異常時(shí)正確關(guān)閉線程。在main函數(shù)中,我們運(yùn)行了10秒鐘,然后通過(guò)pthread_cancel函數(shù)關(guān)閉了所有工作線程。
這就是一個(gè)使用C語(yǔ)言實(shí)現(xiàn)多線程委托模型的例子,其中包含boss線程和worker線程,可以處理工作線程的異常情況。從這個(gè)示例中,我們可以學(xué)到如何創(chuàng)建線程池,如何向線程池中添加任務(wù),如何安全地關(guān)閉線程池,以及如何正確處理線程異常等知識(shí)。
到此這篇關(guān)于C語(yǔ)言實(shí)現(xiàn)一個(gè)多線程委托模型的文章就介紹到這了,更多相關(guān)C語(yǔ)言多線程委托模型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C++11 std::shared_ptr總結(jié)與使用示例代碼詳解
這篇文章主要介紹了C++11 std::shared_ptr總結(jié)與使用,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06C++命名空間using?namespace?std是什么意思
namespace中文意思是命名空間或者叫名字空間,傳統(tǒng)的C++只有一個(gè)全局的namespace,下面這篇文章主要給大家介紹了關(guān)于C++命名空間using?namespace?std是什么意思的相關(guān)資料,需要的朋友可以參考下2023-01-01用C語(yǔ)言簡(jiǎn)單實(shí)現(xiàn)掃雷小游戲
這篇文章主要為大家詳細(xì)介紹了用C語(yǔ)言簡(jiǎn)單實(shí)現(xiàn)掃雷小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08epoll多路復(fù)用的一個(gè)實(shí)例程序(C實(shí)現(xiàn))
這篇文章主要為大家詳細(xì)介紹了epoll多路復(fù)用的一個(gè)實(shí)例程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08C語(yǔ)言實(shí)現(xiàn)飛機(jī)大戰(zhàn)
這篇文章主要為大家詳細(xì)介紹了C語(yǔ)言實(shí)現(xiàn)飛機(jī)大戰(zhàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-06-06C語(yǔ)言動(dòng)態(tài)規(guī)劃多種背包問(wèn)題分析講解
背包問(wèn)題(Knapsack problem)是一種組合優(yōu)化的NP完全問(wèn)題。問(wèn)題可以描述為:給定一組物品,每種物品都有自己的重量和價(jià)格,在限定的總重量?jī)?nèi),我們?nèi)绾芜x擇,才能使得物品的總價(jià)格最高2022-04-04