隊列queue 多應用在多線程應用中,多線程訪問共享變量。對于多線程而言,訪問共享變量時,隊列queue是線程安全的。從queue隊列的具體實現中,可以看出queue使用了1個線程互斥鎖(pthread.Lock()),以及3個條件標量(pthread.condition()),來保證了線程安全。
queue隊列的互斥鎖和條件變量,可以參考另一篇文章:python線程中同步鎖
queue的用法如下:
import Queque a=[1,2,3] device_que=Queque.queue() device_que.put(a) device=device_que.get()
先看看它的初始化函數__init__(self,maxsize=0):
def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = _threading.Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = _threading.Condition(self.mutex) # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = _threading.Condition(self.mutex) # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = 0
定義隊列時有一個默認的參數maxsize, 如果不指定隊列的長度,即manxsize=0,那么隊列的長度為無限長,如果定義了大于0的值,那么隊列的長度就是maxsize。
self._init(maxsize):使用了python自帶的雙端隊列deque,來存儲元素。
self.mutex互斥鎖:任何獲取隊列的狀態(empty(),qsize()等),或者修改隊列的內容的操作(get,put等)都必須持有該互斥鎖。共有兩種操作require獲取鎖,release釋放鎖。同時該互斥鎖被三個共享變量同時享有,即操作conditiond時的require和release操作也就是操作了該互斥鎖。
self.not_full條件變量:當隊列中有元素添加后,會通知notify其他等待添加元素的線程,喚醒等待require互斥鎖,或者有線程從隊列中取出一個元素后,通知其它線程喚醒以等待require互斥鎖。
self.not empty條件變量:線程添加數據到隊列中后,會調用self.not_empty.notify()通知其它線程,喚醒等待require互斥鎖后,讀取隊列。
self.all_tasks_done條件變量:消費者線程從隊列中get到任務后,任務處理完成,當所有的隊列中的任務處理完成后,會使調用queue.join()的線程返回,表示隊列中任務以處理完畢。
queue.put(self, item, block=True, timeout=None)函數:
申請獲得互斥鎖,獲得后,如果隊列未滿,則向隊列中添加數據,并通知notify其它阻塞的某個線程,喚醒等待獲取require互斥鎖。如果隊列已滿,則會wait等待。最后處理完成后釋放互斥鎖。其中還有阻塞block以及非阻塞,超時等邏輯,可以自己看一下:
新聞熱點
疑難解答