一、概念
生產(chǎn)者與消費(fèi)者問(wèn)題是一個(gè)金典的多線程協(xié)作的問(wèn)題.生產(chǎn)者負(fù)責(zé)生產(chǎn)產(chǎn)品,并將產(chǎn)品存放到倉(cāng)庫(kù);消費(fèi)者從倉(cāng)庫(kù)中獲取產(chǎn)品并消費(fèi)。當(dāng)倉(cāng)庫(kù)滿時(shí),生產(chǎn)者必須停止生產(chǎn),直到倉(cāng)庫(kù)有位置存放產(chǎn)品;當(dāng)倉(cāng)庫(kù)空時(shí),消費(fèi)者必須停止消費(fèi),直到倉(cāng)庫(kù)中有產(chǎn)品。
解決生產(chǎn)者/消費(fèi)者問(wèn)題主要用到如下幾個(gè)技術(shù):1.用線程模擬生產(chǎn)者,在run方法中不斷地往倉(cāng)庫(kù)中存放產(chǎn)品。2.用線程模擬消費(fèi)者,在run方法中不斷地從倉(cāng)庫(kù)中獲取產(chǎn)品。3
. 倉(cāng)庫(kù)類保存產(chǎn)品,當(dāng)產(chǎn)品數(shù)量為0時(shí),調(diào)用wait方法,使得當(dāng)前消費(fèi)者線程進(jìn)入等待狀態(tài),當(dāng)有新產(chǎn)品存入時(shí),調(diào)用notify方法,喚醒等待的消費(fèi)者線程。當(dāng)倉(cāng)庫(kù)滿時(shí),調(diào)用wait方法,使得當(dāng)前生產(chǎn)者線程進(jìn)入等待狀態(tài),當(dāng)有消費(fèi)者獲取產(chǎn)品時(shí),調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
二、實(shí)例
package book.thread.product;public class Consumer extends Thread{ private Warehouse warehouse;//消費(fèi)者獲取產(chǎn)品的倉(cāng)庫(kù) private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位 public Consumer(Warehouse warehouse,String name){ super(name); this.warehouse = warehouse; } public void start(){ this.running = true; super.start(); } public void run(){ Product product; try { while(running){ //從倉(cāng)庫(kù)中獲取產(chǎn)品 product = warehouse.getProduct(); sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } //停止消費(fèi)者線程 public void stopConsumer(){ synchronized(warehouse){ this.running = false; warehouse.notifyAll();//通知等待倉(cāng)庫(kù)的線程 } } //消費(fèi)者線程是否在運(yùn)行 public boolean isRunning(){ return running; }} package book.thread.product;public class Producer extends Thread{ private Warehouse warehouse;//生產(chǎn)者存儲(chǔ)產(chǎn)品的倉(cāng)庫(kù) private static int produceName = 0;//產(chǎn)品的名字 private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位 public Producer(Warehouse warehouse,String name){ super(name); this.warehouse = warehouse; } public void start(){ this.running = true; super.start(); } public void run(){ Product product; //生產(chǎn)并存儲(chǔ)產(chǎn)品 try { while(running){ product = new Product((++produceName)+""); this.warehouse.storageProduct(product); sleep(300); } } catch (InterruptedException e) { e.printStackTrace(); } } //停止生產(chǎn)者線程 public void stopProducer(){ synchronized(warehouse){ this.running = false; //通知等待倉(cāng)庫(kù)的線程 warehouse.notifyAll(); } } //生產(chǎn)者線程是否在運(yùn)行 public boolean isRunning(){ return running; }} package book.thread.product;public class Product { private String name;//產(chǎn)品名 public Product(String name){ this.name = name; } public String toString(){ return "Product-"+name; }} package book.thread.product;//產(chǎn)品的倉(cāng)庫(kù)類,內(nèi)部采用數(shù)組來(lái)表示循環(huán)隊(duì)列,以存放產(chǎn)品public class Warehouse { private static int CAPACITY = 11;//倉(cāng)庫(kù)的容量 private Product[] products;//倉(cāng)庫(kù)里的產(chǎn)品 //[front,rear]區(qū)間的產(chǎn)品未被消費(fèi) private int front = 0;//當(dāng)前倉(cāng)庫(kù)中第一個(gè)未被消費(fèi)的產(chǎn)品的下標(biāo) private int rear = 0;//倉(cāng)庫(kù)中最后一個(gè)未被消費(fèi)的產(chǎn)品下標(biāo)加1 public Warehouse(){ this.products = new Product[CAPACITY]; } public Warehouse(int capacity){ this(); if(capacity > 0){ CAPACITY = capacity +1; this.products = new Product[CAPACITY]; } } //從倉(cāng)庫(kù)獲取一個(gè)產(chǎn)品 public Product getProduct() throws InterruptedException{ synchronized(this){ boolean consumerRunning = true;//標(biāo)志消費(fèi)者線程是否還在運(yùn)行 Thread currentThread = Thread.currentThread();//獲取當(dāng)前線程 if(currentThread instanceof Consumer){ consumerRunning = ((Consumer)currentThread).isRunning(); }else{ return null;//非消費(fèi)者不能獲取產(chǎn)品 } //若消費(fèi)者線程在運(yùn)行中,但倉(cāng)庫(kù)中沒(méi)有產(chǎn)品了,則消費(fèi)者線程繼續(xù)等待 while((front==rear) && consumerRunning){ wait(); consumerRunning = ((Consumer)currentThread).isRunning(); } //如果消費(fèi)者線程已經(jīng)停止運(yùn)行,則退出該方法,取消獲取產(chǎn)品 if(!consumerRunning){ return null; } //獲取當(dāng)前未被消費(fèi)的第一個(gè)產(chǎn)品 Product product = products[front]; System.out.println("Consumer[" + currentThread.getName()+"] getProduct:"+product); //將當(dāng)前未被消費(fèi)產(chǎn)品的下標(biāo)后移一位,如果到了數(shù)組末尾,則移動(dòng)到首部 front = (front+1+CAPACITY)%CAPACITY; System.out.println("倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:"+(rear+CAPACITY-front)%CAPACITY); //通知其他等待線程 notify(); return product; } } //向倉(cāng)庫(kù)存儲(chǔ)一個(gè)產(chǎn)品 public void storageProduct(Product product) throws InterruptedException{ synchronized(this){ boolean producerRunning = true;//標(biāo)志生產(chǎn)者線程是否在運(yùn)行 Thread currentThread = Thread.currentThread(); if(currentThread instanceof Producer){ producerRunning = ((Producer)currentThread).isRunning(); }else{ return; } //如果最后一個(gè)未被消費(fèi)的產(chǎn)品與第一個(gè)未被消費(fèi)的產(chǎn)品的下標(biāo)緊挨著,則說(shuō)明沒(méi)有存儲(chǔ)空間了。 //如果沒(méi)有存儲(chǔ)空間了,而生產(chǎn)者線程還在運(yùn)行,則生產(chǎn)者線程等待倉(cāng)庫(kù)釋放產(chǎn)品 while(((rear+1)%CAPACITY == front) && producerRunning){ wait(); producerRunning = ((Producer)currentThread).isRunning(); } //如果生產(chǎn)線程已經(jīng)停止運(yùn)行了,則停止產(chǎn)品的存儲(chǔ) if(!producerRunning){ return; } //保存產(chǎn)品到倉(cāng)庫(kù) products[rear] = product; System.out.println("Producer[" + Thread.currentThread().getName()+"] storageProduct:" + product); //將rear下標(biāo)循環(huán)后移一位 rear = (rear + 1)%CAPACITY; System.out.println("倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:"+(rear + CAPACITY -front)%CAPACITY); notify(); } }} package book.thread.product;public class TestProduct { public static void main(String[] args) { Warehouse warehouse = new Warehouse(10);//建立一個(gè)倉(cāng)庫(kù),容量為10 //建立生產(chǎn)者線程和消費(fèi)者 Producer producers1 = new Producer(warehouse,"producer-1"); Producer producers2 = new Producer(warehouse,"producer-2"); Producer producers3 = new Producer(warehouse,"producer-3"); Consumer consumer1 = new Consumer(warehouse,"consumer-1"); Consumer consumer2 = new Consumer(warehouse,"consumer-2"); Consumer consumer3 = new Consumer(warehouse,"consumer-3"); Consumer consumer4 = new Consumer(warehouse,"consumer-4"); //啟動(dòng)生產(chǎn)者線程和消費(fèi)者線程 producers1.start(); producers2.start(); consumer1.start(); producers3.start(); consumer2.start(); consumer3.start(); consumer4.start(); //讓生產(chǎn)者/消費(fèi)者程序運(yùn)行1600ms try { Thread.sleep(1600); } catch (InterruptedException e) { e.printStackTrace(); } //停止消費(fèi)者線程 producers1.stopProducer(); consumer1.stopConsumer(); producers2.stopProducer(); consumer2.stopConsumer(); producers3.stopProducer(); consumer3.stopConsumer(); consumer4.stopConsumer(); }}輸出結(jié)果:
Producer[producer-1] storageProduct:Product-1倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-2] getProduct:Product-1倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-3] storageProduct:Product-3倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-2] storageProduct:Product-2倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-3] getProduct:Product-3倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-1] getProduct:Product-2倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-1] storageProduct:Product-4倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-4] getProduct:Product-4倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-3] storageProduct:Product-6倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-2] storageProduct:Product-5倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-1] getProduct:Product-6倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-2] getProduct:Product-5倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-1] storageProduct:Product-7倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Consumer[consumer-3] getProduct:Product-7倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0Producer[producer-3] storageProduct:Product-8倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-2] storageProduct:Product-9倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-4] getProduct:Product-8倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-1] storageProduct:Product-10倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Producer[producer-3] storageProduct:Product-11倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Producer[producer-2] storageProduct:Product-12倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4Consumer[consumer-1] getProduct:Product-9倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Consumer[consumer-2] getProduct:Product-10倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-3] getProduct:Product-11倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-3] storageProduct:Product-13倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Producer[producer-1] storageProduct:Product-14倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Producer[producer-2] storageProduct:Product-15倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4Consumer[consumer-4] getProduct:Product-12倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Consumer[consumer-1] getProduct:Product-13倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Consumer[consumer-2] getProduct:Product-14倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1Producer[producer-1] storageProduct:Product-16倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2Producer[producer-3] storageProduct:Product-17倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3Producer[producer-2] storageProduct:Product-18倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4
分析:在main方法中建立了一個(gè)產(chǎn)品倉(cāng)庫(kù),并未該倉(cāng)庫(kù)關(guān)聯(lián)了3個(gè)生產(chǎn)者線程和4個(gè)消費(fèi)者線程,啟動(dòng)這些線程,使生產(chǎn) 者/消費(fèi)者模型運(yùn)作起來(lái),當(dāng)程序運(yùn)行1600ms時(shí),所有的生產(chǎn)者停止生產(chǎn)產(chǎn)品,消費(fèi)者停止消費(fèi)產(chǎn)品。
生產(chǎn)者線程Product在run方法中沒(méi)300ms便生產(chǎn)一個(gè)產(chǎn)品,并存入倉(cāng)庫(kù);消費(fèi)者線程Consumer在run方法中沒(méi)500ms便從倉(cāng)庫(kù)中取一個(gè)產(chǎn)品。
倉(cāng)庫(kù)類Warehouse負(fù)責(zé)存放產(chǎn)品和發(fā)放產(chǎn)品。storageProduct方法負(fù)責(zé)存儲(chǔ)產(chǎn)品,當(dāng)倉(cāng)庫(kù)滿時(shí),當(dāng)前線程進(jìn)入等待狀態(tài),即如果生產(chǎn)者線程A在調(diào)用storageProduct方法以存儲(chǔ)產(chǎn)品時(shí),發(fā)現(xiàn)倉(cāng)庫(kù)已滿,無(wú)法存儲(chǔ)時(shí),便會(huì)進(jìn)入等待狀態(tài)。當(dāng)存儲(chǔ)產(chǎn)品成功時(shí),調(diào)用notify方法,喚醒等待的消費(fèi)者線程。
getProduct方法負(fù)責(zé)提前產(chǎn)品,當(dāng)倉(cāng)庫(kù)空時(shí),當(dāng)前線程進(jìn)入等待狀態(tài),即如果消費(fèi)者線程B在調(diào)用getProduct方法以獲取產(chǎn)品時(shí),發(fā)現(xiàn)倉(cāng)庫(kù)空了,便會(huì)進(jìn)入等待狀態(tài)。當(dāng)提取產(chǎn)品成功時(shí),調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
以上這篇淺談java線程中生產(chǎn)者與消費(fèi)者的問(wèn)題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持武林網(wǎng)。
新聞熱點(diǎn)
疑難解答
圖片精選