国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 網(wǎng)站 > Nginx > 正文

nginx源碼分析線程池詳解

2024-08-30 12:28:56
字體:
供稿:網(wǎng)友

nginx源碼分析線程池詳解

一、前言

     nginx是采用多進(jìn)程模型,master和worker之間主要通過pipe管道的方式進(jìn)行通信,多進(jìn)程的優(yōu)勢就在于各個進(jìn)程互不影響。但是經(jīng)常會有人問道,nginx為什么不采用多線程模型(這個除了之前一篇文章講到的情況,別的只有去問作者了,HAHA)。其實,nginx代碼中提供了一個thread_pool(線程池)的核心模塊來處理多任務(wù)的。下面就本人對該thread_pool這個模塊的理解來跟大家做些分享(文中錯誤、不足還請大家指出,謝謝) 

二、thread_pool線程池模塊介紹

     nginx的主要功能都是由一個個模塊構(gòu)成的,thread_pool也不例外。線程池主要用于讀取、發(fā)送文件等IO操作,避免慢速IO影響worker的正常運行。先引用一段官方的配置示例

Syntax: thread_pool name threads=number [max_queue=number];Default: thread_pool default threads=32 max_queue=65536;Context: main

     根據(jù)上述的配置說明,thread_pool是有名字的,上面的線程數(shù)目以及隊列大小都是指每個worker進(jìn)程中的線程,而不是所有worker中線程的總數(shù)。一個線程池中所有的線程共享一個隊列,隊列中的最大人數(shù)數(shù)量為上面定義的max_queue,如果隊列滿了的話,再往隊列中添加任務(wù)就會報錯。 

     根據(jù)之前講到過的模塊初始化流程(在master啟動worker之前) create_conf--> command_set函數(shù)-->init_conf,下面就按照這個流程看看thread_pool模塊的初始化

/******************* nginx/src/core/ngx_thread_pool.c ************************///創(chuàng)建線程池所需的基礎(chǔ)結(jié)構(gòu)static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle){  ngx_thread_pool_conf_t *tcf;   //從cycle->pool指向的內(nèi)存池中申請一塊內(nèi)存  tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));  if (tcf == NULL) {    return NULL;  }      //先申請包含4個ngx_thread_pool_t指針類型元素的數(shù)組   //ngx_thread_pool_t結(jié)構(gòu)體中保存了一個線程池相關(guān)的信息  if (ngx_array_init(&tcf->pools, cycle->pool, 4,            sizeof(ngx_thread_pool_t *))    != NGX_OK)  {    return NULL;  }   return tcf;} //解析處理配置文件中thread_pool的配置,并將相關(guān)信息保存的ngx_thread_pool_t中static char * ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf){  ngx_str_t     *value;  ngx_uint_t     i;  ngx_thread_pool_t *tp;   value = cf->args->elts;   //根據(jù)thread_pool配置中的name作為線程池的唯一標(biāo)識(如果重名,只有第一個有效)  //申請ngx_thread_pool_t結(jié)構(gòu)保存線程池的相關(guān)信息  //由此可見,nginx支持配置多個name不同的線程池  tp = ngx_thread_pool_add(cf, &value[1]);  .......  //處理thread_pool配置行的所有元素  for (i = 2; i < cf->args->nelts; i++) {    //檢查配置的線程數(shù)    if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {     .......    }         //檢查配置的最大隊列長度    if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {     .......    }  }  ......} //判斷包含多個線程池的數(shù)組中的各個線程池的配置是否正確static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf){  ....  ngx_thread_pool_t **tpp;   tpp = tcf->pools.elts;  //遍歷數(shù)組中所有的線程池配置,并檢查其正確性  for (i = 0; i < tcf->pools.nelts; i++) {    .....  }   return NGX_CONF_OK;}

     在上述的流程走完之后,nginx的master就保存了一份所有線程池的配置(tcf->pools),這份配置在創(chuàng)建worker時也會被繼承。然后每個worker中都調(diào)用各個核心模塊的init_process函數(shù)(如果有的話)。

/******************* nginx/src/core/ngx_thread_pool.c ************************///創(chuàng)建線程池所需的基礎(chǔ)結(jié)構(gòu)static ngx_int_tngx_thread_pool_init_worker(ngx_cycle_t *cycle){  ngx_uint_t        i;  ngx_thread_pool_t    **tpp;  ngx_thread_pool_conf_t  *tcf;  //如果不是worker或者只有一個worker就不起用線程池  if (ngx_process != NGX_PROCESS_WORKER    && ngx_process != NGX_PROCESS_SINGLE)  {    return NGX_OK;  }     //初始化任務(wù)隊列  ngx_thread_pool_queue_init(&ngx_thread_pool_done);   tpp = tcf->pools.elts;  for (i = 0; i < tcf->pools.nelts; i++) {    //初始化各個線程池    if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {      return NGX_ERROR;    }  }   return NGX_OK;} //線程池初始化static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool){  .....  //初始化任務(wù)隊列  ngx_thread_pool_queue_init(&tp->queue);   //創(chuàng)建線程鎖  if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {    return NGX_ERROR;  }   //創(chuàng)建線程條件變量  if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {    (void) ngx_thread_mutex_destroy(&tp->mtx, log);    return NGX_ERROR;  }  ......  for (n = 0; n < tp->threads; n++) {    //創(chuàng)建線程池中的每個線程    err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);    if (err) {      ngx_log_error(NGX_LOG_ALERT, log, err,             "pthread_create() failed");      return NGX_ERROR;    }  }  ......} //線程池中線程處理主函數(shù)static void *ngx_thread_pool_cycle(void *data){   ......   for ( ;; ) {    //阻塞的方式獲取線程鎖    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {      return NULL;    }     /* the number may become negative */    tp->waiting--;     //如果任務(wù)隊列為空,就cond_wait阻塞等待有新任務(wù)時調(diào)用cond_signal/broadcast觸發(fā)    while (tp->queue.first == NULL) {      if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)        != NGX_OK)      {        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);        return NULL;      }    }    //從任務(wù)隊列中獲取task,并將其從隊列中移除    task = tp->queue.first;    tp->queue.first = task->next;     if (tp->queue.first == NULL) {      tp->queue.last = &tp->queue.first;    }     if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {      return NULL;    }    ......    //task的處理函數(shù)    task->handler(task->ctx, tp->log);    .....     ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);     //將經(jīng)過預(yù)處理的任務(wù)添加到done隊列中等待調(diào)用event的回調(diào)函數(shù)繼續(xù)處理    *ngx_thread_pool_done.last = task;    ngx_thread_pool_done.last = &task->next;         //防止編譯器優(yōu)化,保證解鎖操作是在上述語句執(zhí)行完畢后再去執(zhí)行的    ngx_memory_barrier();     ngx_unlock(&ngx_thread_pool_done_lock);         (void) ngx_notify(ngx_thread_pool_handler);  }} //處理pool_done隊列上task中包含的每個event事件static void ngx_thread_pool_handler(ngx_event_t *ev){  .....  ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);   //獲取任務(wù)鏈表的頭部  task = ngx_thread_pool_done.first;  ngx_thread_pool_done.first = NULL;  ngx_thread_pool_done.last = &ngx_thread_pool_done.first;   ngx_memory_barrier();   ngx_unlock(&ngx_thread_pool_done_lock);   while (task) {    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,            "run completion handler for task #%ui", task->id);    //遍歷隊列中的所有任務(wù)事件    event = &task->event;    task = task->next;     event->complete = 1;    event->active = 0;     //調(diào)用event對應(yīng)的處理函數(shù)有針對性的進(jìn)行處理    event->handler(event);  }} 

三、thread_pool線程池使用示例

     根據(jù)之前所講到的,nginx中的線程池主要是用于操作文件的IO操作。所以,在nginx中自帶的模塊ngx_http_file_cache.c文件中看到了線程池的使用。

/*********************** nginx/src/os/unix/ngx_files.c **********************///file_cache模塊的處理函數(shù)(涉及到了線程池)static ssize_t ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c){  .......#if (NGX_THREADS)   if (clcf->aio == NGX_HTTP_AIO_THREADS) {    c->file.thread_task = c->thread_task;    //這里注冊的函數(shù)在下面語句中的ngx_thread_read函數(shù)中被調(diào)用    c->file.thread_handler = ngx_http_cache_thread_handler;    c->file.thread_ctx = r;    //根據(jù)任務(wù)的屬性,選擇正確的線程池,并初始化task結(jié)構(gòu)體中的各個成員        n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0, r->pool);     c->thread_task = c->file.thread_task;    c->reading = (n == NGX_AGAIN);     return n;  }#endif   return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0);}  //task任務(wù)的處理函數(shù)static ngx_int_t ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file){  .......  tp = clcf->thread_pool;  .......     task->event.data = r;  //注冊thread_event_handler函數(shù),該函數(shù)在處理pool_done隊列中event事件時被調(diào)用  task->event.handler = ngx_http_cache_thread_event_handler;   //將任務(wù)放到線程池的任務(wù)隊列中  if (ngx_thread_task_post(tp, task) != NGX_OK) {    return NGX_ERROR;  }  ......} /*********************** nginx/src/core/ngx_thread_pool.c **********************///添加任務(wù)到隊列中ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task){  //如果當(dāng)前的任務(wù)正在處理就退出  if (task->event.active) {    ngx_log_error(NGX_LOG_ALERT, tp->log, 0,           "task #%ui already active", task->id);    return NGX_ERROR;  }   if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {    return NGX_ERROR;  }     //判斷當(dāng)前線程池等待的任務(wù)數(shù)量與最大隊列長度的關(guān)系  if (tp->waiting >= tp->max_queue) {    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);     ngx_log_error(NGX_LOG_ERR, tp->log, 0,           "thread pool /"%V/" queue overflow: %i tasks waiting",           &tp->name, tp->waiting);    return NGX_ERROR;  }  //激活任務(wù)  task->event.active = 1;   task->id = ngx_thread_pool_task_id++;  task->next = NULL;     //通知阻塞的線程有新事件加入,可以解除阻塞  if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);    return NGX_ERROR;  }   *tp->queue.last = task;  tp->queue.last = &task->next;   tp->waiting++;   (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);   ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,          "task #%ui added to thread pool /"%V/"",          task->id, &tp->name);   return NGX_OK;}

    上面示例基本展示了nginx目前對線程池的使用方法,采用線程池來處理IO這類慢速操作可以提升worker的主線程的執(zhí)行效率。當(dāng)然,用戶自己在開發(fā)模塊時,也可以參照file_cache模塊中使用線程池的方法來調(diào)用多線程提升程序性能。(歡迎大家多多批評指正)

感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 保山市| 鄂州市| 加查县| 广昌县| 浠水县| 丰原市| 定日县| 泰宁县| 库伦旗| 浦江县| 邮箱| 霍城县| 当涂县| 大荔县| 禄劝| 乐山市| 香河县| 吉水县| 宣威市| 鄂州市| 吕梁市| 思南县| 恩施市| 东山县| 定日县| 大宁县| 焉耆| 安乡县| 肇东市| 清丰县| 绍兴市| 永城市| 内乡县| 钟祥市| 丹棱县| 鄂伦春自治旗| 长泰县| 耒阳市| 永德县| 车致| 尤溪县|