我們將使用Python線程來解決Python中的生產者—消費者問題。這個問題完全不像他們在學校中說的那么難。
如果你對生產者—消費者問題有了解,看這篇博客會更有意義。
為什么要關心生產者—消費者問題:
可以幫你更好地理解并發和不同概念的并發。 信息隊列中的實現中,一定程度上使用了生產者—消費者問題的概念,而你某些時候必然會用到消息隊列。當我們在使用線程時,你可以學習以下的線程概念:
Condition:線程中的條件。 wait():在條件實例中可用的wait()。 notify() :在條件實例中可用的notify()。我假設你已經有這些基本概念:線程、競態條件,以及如何解決靜態條件(例如使用lock)。否則的話,你建議你去看我上一篇文章basics of Threads。
引用維基百科:
生產者的工作是產生一塊數據,放到buffer中,如此循環。與此同時,消費者在消耗這些數據(例如從buffer中把它們移除),每次一塊。
這里的關鍵詞是“同時”。所以生產者和消費者是并發運行的,我們需要對生產者和消費者做線程分離。
from threading import Thread class ProducerThread(Thread): def run(self): pass class ConsumerThread(Thread): def run(self): pass
再次引用維基百科:
這個為描述了兩個共享固定大小緩沖隊列的進程,即生產者和消費者。
假設我們有一個全局變量,可以被生產者和消費者線程修改。生產者產生數據并把它加入到隊列。消費者消耗這些數據(例如把它移出)。
queue = []
在剛開始,我們不會設置固定大小的條件,而在實際運行時加入(指下述例子)。
一開始帶bug的程序:
from threading import Thread, Lockimport timeimport random queue = []lock = Lock() class ProducerThread(Thread): def run(self): nums = range(5) #Will create the list [0, 1, 2, 3, 4] global queue while True: num = random.choice(nums) #Selects a random number from list [0, 1, 2, 3, 4] lock.acquire() queue.append(num) print "Produced", num lock.release() time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: lock.acquire() if not queue: print "Nothing in queue, but consumer will try to consume" num = queue.pop(0) print "Consumed", num lock.release() time.sleep(random.random()) ProducerThread().start()ConsumerThread().start()
運行幾次并留意一下結果。如果程序在IndexError異常后并沒有自動結束,用Ctrl+Z結束運行。
樣例輸出:
Produced 3Consumed 3Produced 4Consumed 4Produced 1Consumed 1Nothing in queue, but consumer will try to consumeException in thread Thread-2:Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner self.run() File "producer_consumer.py", line 31, in run num = queue.pop(0)IndexError: pop from empty list
新聞熱點
疑難解答