首先介紹一下自己使用到的名詞:
工作線程(worker):創建線程池時,按照指定的線程數量,創建工作線程,等待從任務隊列中get任務;
任務(requests):即工作線程處理的任務,任務可能成千上萬個,但是工作線程只有少數。任務通過 makeRequests來創建
任務隊列(request_queue):存放任務的隊列,使用了queue實現的。工作線程從任務隊列中get任務進行處理;
任務處理函數(callable):工作線程get到任務后,通過調用任務的任務處理函數即(request.callable_)具體 的 處理任務,并返回處理結果;
任務結果隊列(result_queue):任務處理完成后,將返回的處理結果,放入到任務結果隊列中(包括異常);
任務異常處理函數或回調(exc_callback):從任務結果隊列中get結果,如果設置了異常,則需要調用異常回調處理異常;
任務結果回調(callback):從任務結果隊列中get結果,對result進行進一步處理;
上一節介紹了線程池threadpool的安裝和使用,本節將主要介紹線程池工作的主要流程:
(1)線程池的創建
(2)工作線程的啟動
(3)任務的創建
(4)任務的推送到線程池
(5)線程處理任務
(6)任務結束處理
(7)工作線程的退出
下面是threadpool的定義:
class ThreadPool: """A thread pool, distributing work requests and collecting results. See the module docstring for more information. """ def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): pass def createWorkers(self, num_workers, poll_timeout=5): pass def dismissWorkers(self, num_workers, do_join=False): pass def joinAllDismissedWorkers(self): pass def putRequest(self, request, block=True, timeout=None): pass def poll(self, block=False): pass def wait(self): pass
1、線程池的創建(ThreadPool(args))
task_pool=threadpool.ThreadPool(num_works)
task_pool=threadpool.ThreadPool(num_works) def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): """Set up the thread pool and start num_workers worker threads. ``num_workers`` is the number of worker threads to start initially. If ``q_size > 0`` the size of the work *request queue* is limited and the thread pool blocks when the queue is full and it tries to put more work requests in it (see ``putRequest`` method), unless you also use a positive ``timeout`` value for ``putRequest``. If ``resq_size > 0`` the size of the *results queue* is limited and the worker threads will block when the queue is full and they try to put new results in it. .. warning: If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is the possibilty of a deadlock, when the results queue is not pulled regularly and too many jobs are put in the work requests queue. To prevent this, always set ``timeout > 0`` when calling ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. """ self._requests_queue = Queue.Queue(q_size)#任務隊列,通過threadpool.makeReuests(args)創建的任務都會放到此隊列中 self._results_queue = Queue.Queue(resq_size)#字典,任務對應的任務執行結果</span> self.workers = []#工作線程list,通過self.createWorkers()函數內創建的工作線程會放到此工作線程list中 self.dismissedWorkers = []#被設置線程事件并且沒有被join的工作線程 self.workRequests = {}#字典,記錄任務被分配到哪個工作線程中</span> self.createWorkers(num_workers, poll_timeout)
新聞熱點
疑難解答