http://lwn.net/Articles/403891/
工作隊列(workqueue)是另外一種將工作推后執行的形式。工作隊列可以把工作推后,交由一個內核線程去執行,也就是說,這個下半部分可以在進程上下文中執行。 這樣,通過工作隊列執行的代碼能占盡進程上下文的所有優勢。最重要的就是工作隊列允許被重新調度甚至是睡眠。 那么,什么情況下使用工作隊列,什么情況下使用tasklet。如果推后執行的任務需要睡眠,那么就選擇工作隊列。如果推后執行的任務不需要睡眠,那么就選擇tasklet。另外,如果需要用一個可以重新調度的實體來執行你的下半部處理,也應該使用工作隊列。它是唯一能在進程上下文運行的下半部實現的機制,也只有它才可以睡眠。這意味著在需要獲得大量的內存時、在需要獲取信號量時,在需要執行阻塞式的I/O操作時,它都會非常有用。如果不需要用一個內核線程來推后執行工作,那么就考慮使用tasklet。
先說一下worker_pool數據結構,每個cpu管理兩個 worker_pool 數據結構,保存在 cpu_worker_pools 這個per_cpu變量里!! 一個cpu對應的兩個 worker_pool 的 worker_pool->attrs->nice的值分別為0和-20。()
struct worker_pool { int cpu; /* I: the associated cpu */ //所屬的cpu struct list_head worklist; /* L: list of pending works */ //worklist連著所有可以運行的work!!在schedule_work中把work加到鏈表中的 int nr_workers; /* L: total number of workers */ //查看start_worker()函數中,設置worker->flags為WORKER_STARTED,然后nr_workers++ int nr_idle; /* L: currently idle ones */ //worker_enter_idle()中nr_idle++ struct list_head idle_list; /* X: list of idle workers */ //worker_enter_idle()中 list_add(&worker->entry, &pool->idle_list) struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list mayday_timer; /* L: SOS timer for workers */ atomic_t nr_running ____cacheline_aligned_in_smp;}在init_workqueue()->create_and_start_worker函數中,worker 會通過 create_worker(pool) 創建。 create_worker()函數生成一個worker數據結構,并把worker和 worker_pool相關聯。
worker->pool = worker_pool//worker->task 保存生成的taskworker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf);然后在start_worker()中:
worker->flags |= WORKER_STARTED worker->flags |= WORKER_IDLEpool->nr_idle++; //worker_pool->nr_idle++worker->pool->nr_workers++ //worker_pool->nr_worker++worker->last_active = jiffies //保存idle的時候的時間list_add(&worker->entry, &pool->idle_list) //worker->entry關聯到pool->idle_list中wake_up_PRocess(worker->task)//開始worker相關的task[kworker kworker/0:0H 等 ]cpu 0,根據cpu0的兩個work_pool,創建worker,然后根據nice的值創建兩個task,然后建立worker和work_pool的連接。
create_and_start_worker()的create_worker()中1) worker = alloc_worker();2) id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT); worker->id = id;3) worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf); set_user_nice(worker->task, pool->attrs->nice); set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);create_and_start_worker()的start_worker()中1) worker->flags |= WORKER_STARTED;2) worker->pool->nr_workers++;3) worker_enter_idle()中 worker->flags |= WORKER_IDLE; pool->nr_idle++; worker->last_active = jiffies; list_add(&worker->entry, &pool->idle_list);以下是對應的cpu0生成兩個task的log<6>[0.009321] [0:swapper/0:1] create_worker pool->cpu = 0, pool->attrs->nice = 0<6>[0.009512] [0:swapper/0:1] create_worker id_buf = 0:0 //kworker/0:0<6>[0.009781] [0:swapper/0:1] for_each_cpu_worker_pool 1 cpu = 0<6>[0.009803] [0:swapper/0:1] create_worker pool->cpu = 0, pool->attrs->nice = -20<6>[0.009864] [0:swapper/0:1] create_worker id_buf = 0:0H//kworker/0:0H在CPU_UP_PREPARE 的時候,根據cpu值和對應cpu的pool->attrs->nice 的值創建kworker。和上面的cpu0的一樣。 每個cpu生成兩個kworker,nice的值都是0和-20。分別生成1:0,1:0H,依次類推。
<6>[0.046226] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =1 , pool->nr_workers = 0<6>[0.046253] [0:swapper/0:1] create_worker pool->cpu = 1, pool->attrs->nice = 0<6>[0.046326] [0:swapper/0:1] create_worker id_buf = 1:0<6>[0.046629] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =1 , pool->nr_workers = 0<6>[0.046653] [0:swapper/0:1] create_worker pool->cpu = 1, pool->attrs->nice = -20<6>[0.046720] [0:swapper/01] create_worker id_buf = 1:0H<6>[0.055634] [1:swapper/1:0] CPU1: thread -1, cpu 1, socket 0, mpidr 80000001<6>[0.056937] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =2 , pool->nr_workers = 0<6>[0.056970] [0:swapper/0:1] create_worker pool->cpu = 2, pool->attrs->nice = 0<6>[0.057041] [0:swapper/0:1] create_worker id_buf = 2:0<6>[0.057232] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =2 , pool->nr_workers = 0<6>[0.057257] [0:swapper/0:1] create_worker pool->cpu = 2, pool->attrs->nice = -20<6>[0.057402] [0:swapper/0:1] create_worker id_buf = 2:0H<6>[0.066358] [2:swapper/2:0] CPU2: thread -1, cpu 2, socket 0, mpidr 80000002<6>[0.067545] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =3 , pool->nr_workers = 0<6>[0.067578] [0:swapper/0:1] create_worker pool->cpu = 3, pool->attrs->nice = 0<6>[0.067652] [0:swapper/0:1] create_worker id_buf = 3:0<6>[0.067838] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =3 , pool->nr_workers = 0<6>[0.067862] [0:swapper/0:1] create_worker pool->cpu = 3, pool->attrs->nice = -20<6>[0.067930] [0:swapper/0:1] create_worker id_buf = 3:0H<6>[0.076797] [3:swapper/3:0] CPU3: thread -1, cpu 3, socket 0, mpidr 80000003static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb, unsigned long action, void *hcpu){ ... switch (action & ~CPU_TASKS_FROZEN) { case CPU_UP_PREPARE: for_each_cpu_worker_pool(pool, cpu) { pr_info("workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =%d , pool->nr_workers = %d/n", pool->cpu ,pool->nr_workers); if (pool->nr_workers) continue; if (create_and_start_worker(pool) < 0) return NOTIFY_BAD; } break; ...}什么情況下調用get_unbound_pool()函數還需確認??
<6>[0.010539] [0:swapper/0:1] get_unbound_pool !!!!<6>[0.010562] [0:swapper/0:1] create_worker pool->cpu = -1, pool->attrs->nice = 0<6>[0.010623] [0:swapper/0:1] create_worker id_buf = u8:0....<6>[0.537739] [1:kworker/u8:0:6] create_worker pool->cpu = -1, pool->attrs->nice = 0<6>[0.537784] [1:kworker/u8:0:6] create_worker id_buf = u8:1....<6>[0.539426] [0:swapper/0:1] get_unbound_pool !!!!<6>[0.539450] [0:swapper/0:1] create_worker pool->cpu = -1, pool->attrs->nice = -20<6>[0.539520] [0:swapper/0:1] create_worker id_buf = u9:0<6>[0.540053] [1:kworker/u9:0: 42] create_worker pool->cpu = -1, pool->attrs->nice = -20<6>[0.540095] [1:kworker/u9:0: 42] create_worker id_buf = u9:1在每個kworker運行的時候,也就是在worker_thread()函數中,通過一些條件來判斷是否需要創建一個新的worker。
static int worker_thread(void *__worker){ ... //pool->idle_list鏈表去掉這個當前worker,pool->nr_idle--; worker_leave_idle(worker);recheck: //pool->worklist是否為空或者pool->nr_running是否為0 if (!need_more_worker(pool)) goto sleep; //pool->worklist不為空且pool->nr_running非0的時候會跑到這里 //may_start_working()判斷pool->nr_idle是否是0, //pool->nr_idle為0的話就調用manage_workers()函數 /* do we need to manage? */ if (unlikely(!may_start_working(pool)) && manage_workers(worker)) goto recheck;sleep: //pool->worklist為空或者pool->nr_running為0的時候跑到sleep這里, //pool->worklist為空且pool->nr_running非0且pool->nr_idle==0的時候調用manage_worker() if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker)) goto recheck; /* static bool need_to_manage_workers(struct worker_pool *pool) { return need_to_create_worker(pool) || (pool->flags & POOL_MANAGE_WORKERS); } //pool->worklist不為空且pool->nr_running非0的時候need_more_worker()返回true //may_start_working()在pool->nr_idle==0的時候為false, //所以need_to_create_worker()函數在pool->worklist為空且pool->nr_running非0且 //pool->nr_idle==0 的時候返回true。 static bool need_to_create_worker(struct worker_pool *pool) { return need_more_worker(pool) && !may_start_working(pool); } */ worker_enter_idle(worker); __set_current_state(TASK_INTERRUPTIBLE); spin_unlock_irq(&pool->lock); schedule(); goto woke_up;}根據在create_worker()函數以及在函數中加的log,總結create_and_start_worker()函數做的事情 1)根據每個cpu的work_pool,通過create_worker()函數創建一個worker,把pool, idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT)算出來的結果等都保存在worker結構體中
worker = alloc_worker();//創建worker worker->pool = pool; //分配id保存在worker->id中 id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT); worker->id = id; //創建task并保存,設置nice的值 worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf); set_user_nice(worker->task, pool->attrs->nice); set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask); worker->task->flags |= PF_NO_SETAFFINITY; if (pool->flags & POOL_DISASSOCIATED) worker->flags |= WORKER_UNBOUND;2)根據上面創建的worker,調用start_worker()函數開始相關的worker
worker->flags |= WORKER_STARTED; worker->pool->nr_workers++; worker->flags |= WORKER_IDLE; //增加pool中idle狀態的worker的個數 pool->nr_idle++; //標記worker開始的時間 worker->last_active = jiffies; //worker連到pool->idle_list的鏈表中 list_add(&worker->entry, &pool->idle_list);alloc_workqueue()函數分配workqueue_struct是一個比較費時的工作,所以在init_workqueues()的最后,會調用alloc_workqueue()函數分配幾個workqueue以便使用。
system_wq = alloc_workqueue("events", 0, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0);其中最長用到的就是下面的叫events的workqueue。 system_wq = alloc_workqueue(“events”, 0, 0) ; 以下看一下alloc_workqueue()函數都做什么事情
1.分配一個workqueue_struct數據結構,初始化鏈表以及設置名字等等
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); va_start(args, lock_name); vsnprintf(wq->name, sizeof(wq->name), fmt, args); va_end(args); max_active = max_active ?: WQ_DFL_ACTIVE; max_active = wq_clamp_max_active(max_active, flags, wq->name); /* init wq */ wq->flags = flags; wq->saved_max_active = max_active; mutex_init(&wq->mutex); atomic_set(&wq->nr_pwqs_to_flush, 0); INIT_LIST_HEAD(&wq->pwqs); INIT_LIST_HEAD(&wq->flusher_queue); INIT_LIST_HEAD(&wq->flusher_overflow); INIT_LIST_HEAD(&wq->maydays); lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); INIT_LIST_HEAD(&wq->list);2.調用 alloc_and_link_pwqs() 函數,按flags 為0的流程看一下過程。 1) 分配一個wq->cpu_pwqs
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue)//這個數據結構是pool_workqueue,是per_cpu變量。2) 對應每個cpu的 pool_workqueue 和 worker_pool[prio] 關聯起來。
//表示根據priority,也就是nice的值,把相應cpu的worker_pool賦值給pool_workqueue->pool。pool_workqueue->pool = worker_pool[highpri?]//每個cpu對應的pool_workqueue都可以訪問到這個workqueue_structpool_workqueue->wq = workqueue_struct由于 workqueue_struct->cpu_pwqs 保存著pool_workqueue這個per_cpu變量。 所以通過 workqueue_struct 就可以找到 worker_pool。
3) 調用 link_pwq() 中的 list_add_rcu(&pwq->pwqs_node, &wq->pwqs)
//link_pwq()函數中,pool_workqueue會被鏈接到workqueue_struct->pwqs中 list_add_rcu(&pwq->pwqs_node, &wq->pwqs); //workqueue_struct->pwqs保存 pool_workqueuestatic int alloc_and_link_pwqs(struct workqueue_struct *wq){ bool highpri = wq->flags & WQ_HIGHPRI; int cpu, ret; if (!(wq->flags & WQ_UNBOUND)) { wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); if (!wq->cpu_pwqs) return -ENOMEM; for_each_possible_cpu(cpu) { struct pool_workqueue *pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); //每個cpu對應的workqueue_struct->pool_workqueue和對應cpu的cpu_worker_pools struct worker_pool *cpu_pools = per_cpu(cpu_worker_pools, cpu); init_pwq(pwq, wq, &cpu_pools[highpri]); //每個cpu對應的workqueue_struct和 cpu_worker_pools 放到 pool_workqueue中!! //pool_workqueue->pool 放 cpu_worker_pools , pool_workqueue->wq 放 worker_pool mutex_lock(&wq->mutex); //再把pool_workqueue通過 pool_workqueue->pwqs_node加到workqueue_struct->pwqs 中! link_pwq(pwq); mutex_unlock(&wq->mutex); } return 0; } else if (wq->flags & __WQ_ORDERED) { ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), "ordering guarantee broken for workqueue %s/n", wq->name); return ret; } else { return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); }}上面講了worker的初始化等,但每個worker只是一個進程一直在那里跑,work才是需要做的實際的工作。 下面來看一下work的初始化以及怎么選worker并加入到worker里邊去的。 以下以idletimer_tg_work()為例,看一下一個work的初始化:
INIT_WORK(&info->timer->work, idletimer_tg_work);#define INIT_WORK(_work, _func) / do { / __INIT_WORK((_work), (_func), 0); / } while (0)#define __INIT_WORK(_work, _func, _onstack) / do { / __init_work((_work), _onstack); / (_work)->data = (atomic_long_t) WORK_DATA_INIT(); / INIT_LIST_HEAD(&(_work)->entry); / PREPARE_WORK((_work), (_func)); / } while (0)#endif//work_struct結構體struct work_struct { atomic_long_t data; struct list_head entry; work_func_t func;#ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map;#endif};有需要這個work工作的時候就調用schedule_work(&timer->work),把work加入到workqueue_struct
schedule_work()最終會調用下面的函數,在wq->cpu_pwqs取出對應的pool_workqueue,經過中間的一些步驟,最后檢查pool_workqueue->nr_active的個數,最終調用insert_work函數把work加入到pool_workqueue->pool->worklist或者加到pool_workqueue->delayed_worksstatic void __queue_work(int cpu, struct workqueue_struct *wq, struct work_struct *work){ if (!(wq->flags & WQ_UNBOUND)) pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); else pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu)); last_pool = get_work_pool(work); if (last_pool && last_pool != pwq->pool) { struct worker *worker; spin_lock(&last_pool->lock); worker = find_worker_executing_work(last_pool, work); if (worker && worker->current_pwq->wq == wq) { pwq = worker->current_pwq; } else { /* meh... not running there, queue here */ spin_unlock(&last_pool->lock); spin_lock(&pwq->pool->lock); } } else { spin_lock(&pwq->pool->lock); } //查看當前pool_workqueue的nr_active,選擇要加的worklist if (likely(pwq->nr_active < pwq->max_active)) { trace_workqueue_activate_work(work); pwq->nr_active++; worklist = &pwq->pool->worklist; } else { work_flags |= WORK_STRUCT_DELAYED; worklist = &pwq->delayed_works; } //把當前的work連到pool_workqueue->worklist中 insert_work(pwq, work, worklist, work_flags); spin_unlock(&pwq->pool->lock);}最終在創建的kworker進程的函數worker_thread()中,會從pool->worklist中選出一個并進行調用 struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry);
新聞熱點
疑難解答