本文為大家分享了threadpool線程池中所有的操作,供大家參考,具體內(nèi)容如下
首先介紹一下自己使用到的名詞:
工作線程(worker):創(chuàng)建線程池時(shí),按照指定的線程數(shù)量,創(chuàng)建工作線程,等待從任務(wù)隊(duì)列中g(shù)et任務(wù);
任務(wù)(requests):即工作線程處理的任務(wù),任務(wù)可能成千上萬(wàn)個(gè),但是工作線程只有少數(shù)。任務(wù)通過(guò) makeRequests來(lái)創(chuàng)建
任務(wù)隊(duì)列(request_queue):存放任務(wù)的隊(duì)列,使用了queue實(shí)現(xiàn)的。工作線程從任務(wù)隊(duì)列中g(shù)et任務(wù)進(jìn)行處理;
任務(wù)處理函數(shù)(callable):工作線程get到任務(wù)后,通過(guò)調(diào)用任務(wù)的任務(wù)處理函數(shù)即(request.callable_)具體 的 處理任務(wù),并返回處理結(jié)果;
任務(wù)結(jié)果隊(duì)列(result_queue):任務(wù)處理完成后,將返回的處理結(jié)果,放入到任務(wù)結(jié)果隊(duì)列中(包括異常);
任務(wù)異常處理函數(shù)或回調(diào)(exc_callback):從任務(wù)結(jié)果隊(duì)列中g(shù)et結(jié)果,如果設(shè)置了異常,則需要調(diào)用異常回調(diào)處理異常;
任務(wù)結(jié)果回調(diào)(callback):從任務(wù)結(jié)果隊(duì)列中g(shù)et結(jié)果,對(duì)result進(jìn)行進(jìn)一步處理;
上一節(jié)介紹了線程池threadpool的安裝和使用,本節(jié)將主要介紹線程池工作的主要流程:
(1)線程池的創(chuàng)建
(2)工作線程的啟動(dòng)
(3)任務(wù)的創(chuàng)建
(4)任務(wù)的推送到線程池
(5)線程處理任務(wù)
(6)任務(wù)結(jié)束處理
(7)工作線程的退出
下面是threadpool的定義:
class ThreadPool: """A thread pool, distributing work requests and collecting results. See the module docstring for more information. """ def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): pass def createWorkers(self, num_workers, poll_timeout=5): pass def dismissWorkers(self, num_workers, do_join=False): pass def joinAllDismissedWorkers(self): pass def putRequest(self, request, block=True, timeout=None): pass def poll(self, block=False): pass def wait(self): pass
1、線程池的創(chuàng)建(ThreadPool(args))
task_pool=threadpool.ThreadPool(num_works)
task_pool=threadpool.ThreadPool(num_works) def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): """Set up the thread pool and start num_workers worker threads. ``num_workers`` is the number of worker threads to start initially. If ``q_size > 0`` the size of the work *request queue* is limited and the thread pool blocks when the queue is full and it tries to put more work requests in it (see ``putRequest`` method), unless you also use a positive ``timeout`` value for ``putRequest``. If ``resq_size > 0`` the size of the *results queue* is limited and the worker threads will block when the queue is full and they try to put new results in it. .. warning: If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is the possibilty of a deadlock, when the results queue is not pulled regularly and too many jobs are put in the work requests queue. To prevent this, always set ``timeout > 0`` when calling ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. """ self._requests_queue = Queue.Queue(q_size)#任務(wù)隊(duì)列,通過(guò)threadpool.makeReuests(args)創(chuàng)建的任務(wù)都會(huì)放到此隊(duì)列中 self._results_queue = Queue.Queue(resq_size)#字典,任務(wù)對(duì)應(yīng)的任務(wù)執(zhí)行結(jié)果</span> self.workers = []#工作線程list,通過(guò)self.createWorkers()函數(shù)內(nèi)創(chuàng)建的工作線程會(huì)放到此工作線程list中 self.dismissedWorkers = []#被設(shè)置線程事件并且沒(méi)有被join的工作線程 self.workRequests = {}#字典,記錄任務(wù)被分配到哪個(gè)工作線程中</span> self.createWorkers(num_workers, poll_timeout)
新聞熱點(diǎn)
疑難解答
圖片精選