国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 操作系統 > 正文

簡單實用可線上應用的線程池組件

2024-06-28 13:24:04
字體:
來源:轉載
供稿:網友
簡單實用可線上應用的線程池組件0 前言

線程池的組件網上很多,之前我自己也嘗試寫個一個demo,但這些組件一般都比較簡單,沒有完整的實現后臺線程池組件應用的功能。因此,這里我們實現一個可以用在線上環境的線程池組件,該線程池組件具備線程池應用的特性,如下所示:

1. 伸縮性:即線程池中線程的個數應該是動態變化的。繁忙的時候可以申請更多的線程;空閑的時候則注銷一部分線程。

2. 線程狀態:線程池中對線程的管理引入睡眠、喚醒機制。當線程沒有任務在運行時,使線程處于睡眠狀態。

3. 線程管理:對線程池中線程的申請和注銷,不是通過創建一個單獨線程來管理,而是線程池自動管理。

最終,我們實現的線程池局部一下幾個特點:

1. 線程池中線程的個數介于min和max之間;

2. 線程池中線程具有睡眠和喚醒機制;

3. 當線程池中的線程睡眠時間超過1秒鐘,則結束一個線程;當線程池中持續1秒沒有空閑線程時,則創建一個新線程。

1 關鍵數據結構

主要的數據結構包括兩個CThreadPool和CThreadWorker。

這里,我們先給出數據結構的定義,然后簡要描述他們的作用。

相關數據結構定義如下:

 1 class CThreadWorker 2 { 3     CThreadWorker *next_;    // 線程池中的線程是單鏈表結構保存 4      5     int wake_up_;            // 喚醒標志 6     pthread_cond_t wake_;     7     pthread_mutex_t mutex_; 8      9     pthread_t tid_;10     void (*func)(void *arg); // 函數執行體11     void *arg;               // 函數執行體參數12     time_t sleep_when_;      // 線程睡眠時間13 };14 15 class CThreadPool16 {17     CThreadWorker *head_;18     19     unsigned int min_;     // 最少線程數20     unsigned int cur_;     // 當前線程數21     unsigned int max_;     // 最大線程數22     23     time_t last_empty_;    // 最后一次線程池中沒有線程的時間24     pthread_mutex_t mutex_;25 };

其中,CThreadPool用來管理線程池,記錄線程池當前線程個數、最小線程個數、最大線程個數等。

CThreadWorker是具體保存線程記錄的實體,它里面保存了線程的執行函數體,函數參數、睡眠時間等等。還有一個信號量,用來讓線程睡眠或者喚醒。

當我們創建一個線程池時,即默認創建了min個CThreadWorker,每個線程實體默認都是睡眠的,阻塞在pthread_cond_wait處,然后我們具體執行用戶函數時,從線程池中獲取線程,更改該線程的func,arg等參數,然后喚醒該線程。

中間,線程池會對線程做一些管理,保證線程池的伸縮性。

2 源碼2.1 頭文件
 1 #ifndef _THREAD_POOL_H_ 2 #define _THREAD_POOL_H_ 3  4 #include <pthread.h> 5 #include <time.h> 6  7 struct CThreadWorker 8 { 9     CThreadWorker *next_;    // 線程池中的線程是單鏈表結構保存10     11     int wake_up_;            // 喚醒標志12     pthread_cond_t wake_;    13     pthread_mutex_t mutex_;14     15     pthread_t tid_;16     void (*func)(void *arg); // 函數執行體17     void *arg;               // 函數執行體參數18     time_t sleep_when_;      // 線程睡眠時間19 };20 21 struct CThreadPool22 {23     CThreadWorker *head_;24     25     unsigned int min_;     // 最少線程數26     unsigned int cur_;     // 當前線程數27     unsigned int max_;     // 最大線程數28     29     time_t last_empty_;    // 最后一次線程池中沒有線程的時間30     pthread_mutex_t mutex_;31 };32 33 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max);34 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg);35 #endif
2.2 實現文件

實現文件主要完成.h中的CreateThreadPool和StartWork。我們先給個流程圖大概描述一下。

2.2.1 CreateThreadPool流程描述

如下圖所示為創建線程池的流程,其中最關鍵的步驟是DoPRocess,當執行到DoProcess時,我們默認創建了min個線程,所有線程都因為信號而睡眠。StartWork里面會獲取一個線程,然后修改該線程的func,arg參數,最后喚醒線程執行我們的任務。當一個線程執行完的時候,我們需要判斷線程池當前的狀態,是需要新創建一個線程,還是把該線程重新加入到線程池,還是注銷該線程。具體的這些邏輯圖示不好描述,我們在下面代碼里給出注釋。

2.2.2 StartWork流程描述

如下圖是啟動一個任務的流程,2.2.1我們已經描述到,啟動任務時,我們會從線程池中拿出一個線程,修改該線程的func/arg屬性,然后喚醒該線程。當線程執行完以后,是需要重新加入線程池,還是注銷,則是在2.2.1的DoProcess中處理。

2.3 實現文件
  1 #include <stdlib.h>  2 #include "pthread_pool.h"  3   4 // 線程池持續1秒沒有空閑線程  5 #define WaitWorkerTimeout(pool)     ((time(NULL) - pool->last_empty_) > 1)  6 // 線程池中沒有線程,所有的線程已經pop出去執行具體的任務去了  7 #define NoThreadInPool(pool)        (pool->head_ == NULL)  8 #define CanCreateThread(pool)       (pool->cur_ < pool->max_)  9  10  11 static int CreateOneThread(CThreadPool *pool); 12 static void *DoProcess(void *arg); 13 static void PushWork(CThreadPool *pool, CThreadWorker *worker); 14 static void PopWork(CThreadPool *pool, CThreadWorker *worker); 15 static void InitWorker(CThreadWorker *worker); 16 static int WorkerIdleTimeout(CThreadPool *pool); 17 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg); 18 static void WakeupWorkerThread(CThreadWorker *worker); 19  20 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg); 21 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max); 22  23 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max) 24 { 25     CThreadPool *poo; 26      27     pool = (CThreadPool *)malloc(sizeof(CThreadPool)); 28      29     if (pool == NULL) { 30         return NULL; 31     } 32      33     pool->head_ = NULL; 34     pool->min_  = min; 35     pool->cur_  = 0; 36     pool->max_  = max; 37     pool->last_empty_ = time(NULL); 38     pthread_mutex_init(&pool->mutex_, NULL); 39      40     int ret = 0; 41     while (min--) { 42         ret = CreateOneThread(pool); 43         if (ret != 0) { 44             exit(0); 45         } 46     } 47     return pool; 48 } 49  50 static int CreateOneThread(CThreadPool *pool) 51 { 52     pthread_t tid; 53     return pthread_create(&tid, NULL, DoProcess, pool); 54 } 55  56 static void *DoProcess(void *arg) 57 { 58     CThreadPool *pool = (CThreadPool *)arg; 59      60     CThreadWorker worker; 61      62     InitWorker(&worker); 63      64     pthread_mutex_lock(&pool->mutex_); 65     pool->cur_ += 1; 66      67     for (;;) { 68         PushWork(pool, &worker); 69         worker.sleep_when_ = time(NULL); 70         pthread_mutex_unlock(&pool->mutex_); 71          72         pthread_mutex_lock(&worker.mutex_); 73         while (worker.wake_up_ != 1) { 74             pthread_cond_wait(&worker.wake_, &worker.mutex_); 75         } 76         // worker線程已被喚醒,準備開始執行任務,修改wake_up_標志。 77         worker.wake_up_ = 0; 78         pthread_mutex_unlock(&worker.mutex_); 79          80         // 執行我們的任務,執行完畢之后,修改worker.func為NULL。 81         worker.func(arg); 82         worker.func = NULL; 83          84         // 任務執行完以后,線程池需要根據當前的線程池狀態來判斷是要把該線程重新加入線程池,還是要創建一個新的線程。 85         pthread_mutex_lock(&pool->mutex_); 86         if (WaitWorkerTimeout(pool) && NoThreadInPool(pool) && CanCreateThread(pool)) { 87             // 在我們執行這個任務的時候,其他任務等待空閑線程的時間超過了1秒,而且線程池中沒有線程,且線程池當前線程數沒有超過最大允許創建線程數 88             CreateOneThread(pool); 89         } 90          91         // 線程池中沒有線程了,重新把該線程加入線程池 92         if (NoThreadInPool(pool)) { 93             continue; 94         } 95          96         // 線程池中線程數低于最低閾值,重新把該線程加入線程池 97         if (pool->curr <= pool->min) { 98             continue; 99         }100         101         // 線程中睡眠的線程時間超過了1秒,說明線程池不是很繁忙,不需要把該線程重新加回線程池102         if (WorkerIdleTimeout(pool)) {103             break;104         }105     }106     107     pool->cur -= 1;108     pthread_mutex_unlock(&pool->mutex);109     110     pthread_cond_destroy(&worker.wake_);111     pthread_mutex_destroy(&worker.mutex_);112 113     pthread_exit(NULL);114 }115 116 static void InitWorker(CThreadWorker *worker)117 {118     worker->next_ = NULL;119     worker->wake_up_ = 0;120     pthread_cond_init(&worker->wake_, NULL);121     pthread_mutex_init(&worker->mutex_, NULL);122     worker->tid_ = pthread_self();123     worker->func = NULL;124     worker->arg = NULL;125     worker->sleep_when_ = 0;126 }127 128 static void PushWork(CThreadPool *pool, CThreadWorker *worker)129 {130     worker->next_ = pool->head_;131     pool->next_ = worker;132 }133 134 static int WorkerIdleTimeout(CThreadPool *pool)135 {136     CThreadWorker *worker;137     138     if (NoThreadInPool(pool)) {139         return 0;140     }141     worker = pool->head_;142     return (time(NULL) > worker->sleep_when_ + 1)? 1 : 0;143 }144 145 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg)146 {147     if (func == NULL) {148         return -1;149     }150     151     CThreadWorker *worker;152     pthread_mutex_lock(&pool->mutex_);153     worker = GetWorker(pool, func, arg);154     pthread_mutex_unlock(&pool->mutex_);155     156     if (worker == NULL) {157         return -2;158     }159     160     WakeupWorkerThread(worker);161     return 0;162 }163 164 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg)165 {166     CThreadWorker *worker;167     168     if (NoThreadInPool(pool)) {169         return NULL;170     }171     172     worker = pool->head_;173     PopWork(pool, worker);174     175     if (NoThreadInPool(pool)) {176         pool->last_empty_ = time(NULL);177     }178 179     worker->func = func;180     worker->arg = arg;181 182     return worker;183 }184 185 static void PopWork(CThreadPool *pool, CThreadWorker *worker)186 {187     pool->head_ = worker->next_;188     worker->next_ = NULL;189 }190 191 static void WakeupWorkerThread(CThreadWorker *worker)192 {193     pthread_mutex_lock(&worker->mutex_);194     worker->wake_up_ = 1;195     pthread_mutex_unlock(&worker->mutex_);196 197     pthread_cond_signal(&worker->wake_);198 }
3 總結

該線程池模型實現了我們線上環境實際的一些應用場景,沒有考慮的問題有這么幾點:

1. 沒有考慮任務類的概念,寫一個任務基類,然后具體的任務類繼承這個基類,實現自己的功能,具體執行時候,只需要add_task,把任務加進線程池即可,這樣做的目的是想著可以給每個任務發信號,是否要終止任務的執行。當前的線程池做不到這點,也做不到判斷任務執行時間,是否超時。

2. 每個線程只執行一個任務,而不是搞一個任務隊列,去執行任務隊列里的任務。關于這點,我也不清楚,搞任務隊列的優勢是什么。

個人能想到的就是這些,不知道諸位平時工作中,具體應用線程池的時候需要考慮一些什么場景,我這里想到的就是1、可以靈活的終止任務;2、可以判斷任務是否超時;3、可以對需要執行的任務做到負載均衡(覺得這一點,在這里的線程池里不是問題,極端的情況是來了一堆任務,線程池中線程不夠了,那確實會有這個問題)。

關于這幾個問題,第3個我個人能解決,可以把上面的線程池稍微修改一下,每個線程結果ThreadWorker里面再加個任務隊列,當沒有任務時候線程阻塞,否則就取出任務來執行。

關于第一點,怎么修改這個框架,達到可以給任務發信號呢。

還有第二點,判斷任務是否超時呢?

歡迎大家一起討論!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 红安县| 佛山市| 镇坪县| 本溪市| 永济市| 孟州市| 舒兰市| 皮山县| 镇巴县| 岳阳市| 奉化市| 黔西县| 凌海市| 寿阳县| 缙云县| 博客| 文化| 尖扎县| 花莲县| 抚顺市| 黎川县| 枞阳县| 和平区| 托克逊县| 湘西| 响水县| 镇原县| 永兴县| 阳原县| 怀集县| 宜兴市| 景洪市| 民县| 独山县| 洛浦县| 特克斯县| 海伦市| 满洲里市| 盐城市| 那曲县| 神农架林区|