淺談java線程中生產(chǎn)者與消費者的問題
一、概念
生產(chǎn)者與消費者問題是一個金典的多線程協(xié)作的問題.生產(chǎn)者負責(zé)生產(chǎn)產(chǎn)品,并將產(chǎn)品存放到倉庫;消費者從倉庫中獲取產(chǎn)品并消費。當倉庫滿時,生產(chǎn)者必須停止生產(chǎn),直到倉庫有位置存放產(chǎn)品;當倉庫空時,消費者必須停止消費,直到倉庫中有產(chǎn)品。
解決生產(chǎn)者/消費者問題主要用到如下幾個技術(shù):1.用線程模擬生產(chǎn)者,在run方法中不斷地往倉庫中存放產(chǎn)品。2.用線程模擬消費者,在run方法中不斷地從倉庫中獲取產(chǎn)品。3
. 倉庫類保存產(chǎn)品,當產(chǎn)品數(shù)量為0時,調(diào)用wait方法,使得當前消費者線程進入等待狀態(tài),當有新產(chǎn)品存入時,調(diào)用notify方法,喚醒等待的消費者線程。當倉庫滿時,調(diào)用wait方法,使得當前生產(chǎn)者線程進入等待狀態(tài),當有消費者獲取產(chǎn)品時,調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
二、實例
package book.thread.product; public class Consumer extends Thread{ private Warehouse warehouse;//消費者獲取產(chǎn)品的倉庫 private boolean running = false;//是否需要結(jié)束線程的標志位 public Consumer(Warehouse warehouse,String name){ super(name); this.warehouse = warehouse; } public void start(){ this.running = true; super.start(); } public void run(){ Product product; try { while(running){ //從倉庫中獲取產(chǎn)品 product = warehouse.getProduct(); sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } //停止消費者線程 public void stopConsumer(){ synchronized(warehouse){ this.running = false; warehouse.notifyAll();//通知等待倉庫的線程 } } //消費者線程是否在運行 public boolean isRunning(){ return running; } } package book.thread.product; public class Producer extends Thread{ private Warehouse warehouse;//生產(chǎn)者存儲產(chǎn)品的倉庫 private static int produceName = 0;//產(chǎn)品的名字 private boolean running = false;//是否需要結(jié)束線程的標志位 public Producer(Warehouse warehouse,String name){ super(name); this.warehouse = warehouse; } public void start(){ this.running = true; super.start(); } public void run(){ Product product; //生產(chǎn)并存儲產(chǎn)品 try { while(running){ product = new Product((++produceName)+""); this.warehouse.storageProduct(product); sleep(300); } } catch (InterruptedException e) { e.printStackTrace(); } } //停止生產(chǎn)者線程 public void stopProducer(){ synchronized(warehouse){ this.running = false; //通知等待倉庫的線程 warehouse.notifyAll(); } } //生產(chǎn)者線程是否在運行 public boolean isRunning(){ return running; } } package book.thread.product; public class Product { private String name;//產(chǎn)品名 public Product(String name){ this.name = name; } public String toString(){ return "Product-"+name; } } package book.thread.product; //產(chǎn)品的倉庫類,內(nèi)部采用數(shù)組來表示循環(huán)隊列,以存放產(chǎn)品 public class Warehouse { private static int CAPACITY = 11;//倉庫的容量 private Product[] products;//倉庫里的產(chǎn)品 //[front,rear]區(qū)間的產(chǎn)品未被消費 private int front = 0;//當前倉庫中第一個未被消費的產(chǎn)品的下標 private int rear = 0;//倉庫中最后一個未被消費的產(chǎn)品下標加1 public Warehouse(){ this.products = new Product[CAPACITY]; } public Warehouse(int capacity){ this(); if(capacity > 0){ CAPACITY = capacity +1; this.products = new Product[CAPACITY]; } } //從倉庫獲取一個產(chǎn)品 public Product getProduct() throws InterruptedException{ synchronized(this){ boolean consumerRunning = true;//標志消費者線程是否還在運行 Thread currentThread = Thread.currentThread();//獲取當前線程 if(currentThread instanceof Consumer){ consumerRunning = ((Consumer)currentThread).isRunning(); }else{ return null;//非消費者不能獲取產(chǎn)品 } //若消費者線程在運行中,但倉庫中沒有產(chǎn)品了,則消費者線程繼續(xù)等待 while((front==rear) && consumerRunning){ wait(); consumerRunning = ((Consumer)currentThread).isRunning(); } //如果消費者線程已經(jīng)停止運行,則退出該方法,取消獲取產(chǎn)品 if(!consumerRunning){ return null; } //獲取當前未被消費的第一個產(chǎn)品 Product product = products[front]; System.out.println("Consumer[" + currentThread.getName()+"] getProduct:"+product); //將當前未被消費產(chǎn)品的下標后移一位,如果到了數(shù)組末尾,則移動到首部 front = (front+1+CAPACITY)%CAPACITY; System.out.println("倉庫中還沒有被消費的產(chǎn)品數(shù)量:"+(rear+CAPACITY-front)%CAPACITY); //通知其他等待線程 notify(); return product; } } //向倉庫存儲一個產(chǎn)品 public void storageProduct(Product product) throws InterruptedException{ synchronized(this){ boolean producerRunning = true;//標志生產(chǎn)者線程是否在運行 Thread currentThread = Thread.currentThread(); if(currentThread instanceof Producer){ producerRunning = ((Producer)currentThread).isRunning(); }else{ return; } //如果最后一個未被消費的產(chǎn)品與第一個未被消費的產(chǎn)品的下標緊挨著,則說明沒有存儲空間了。 //如果沒有存儲空間了,而生產(chǎn)者線程還在運行,則生產(chǎn)者線程等待倉庫釋放產(chǎn)品 while(((rear+1)%CAPACITY == front) && producerRunning){ wait(); producerRunning = ((Producer)currentThread).isRunning(); } //如果生產(chǎn)線程已經(jīng)停止運行了,則停止產(chǎn)品的存儲 if(!producerRunning){ return; } //保存產(chǎn)品到倉庫 products[rear] = product; System.out.println("Producer[" + Thread.currentThread().getName()+"] storageProduct:" + product); //將rear下標循環(huán)后移一位 rear = (rear + 1)%CAPACITY; System.out.println("倉庫中還沒有被消費的產(chǎn)品數(shù)量:"+(rear + CAPACITY -front)%CAPACITY); notify(); } } } package book.thread.product; public class TestProduct { public static void main(String[] args) { Warehouse warehouse = new Warehouse(10);//建立一個倉庫,容量為10 //建立生產(chǎn)者線程和消費者 Producer producers1 = new Producer(warehouse,"producer-1"); Producer producers2 = new Producer(warehouse,"producer-2"); Producer producers3 = new Producer(warehouse,"producer-3"); Consumer consumer1 = new Consumer(warehouse,"consumer-1"); Consumer consumer2 = new Consumer(warehouse,"consumer-2"); Consumer consumer3 = new Consumer(warehouse,"consumer-3"); Consumer consumer4 = new Consumer(warehouse,"consumer-4"); //啟動生產(chǎn)者線程和消費者線程 producers1.start(); producers2.start(); consumer1.start(); producers3.start(); consumer2.start(); consumer3.start(); consumer4.start(); //讓生產(chǎn)者/消費者程序運行1600ms try { Thread.sleep(1600); } catch (InterruptedException e) { e.printStackTrace(); } //停止消費者線程 producers1.stopProducer(); consumer1.stopConsumer(); producers2.stopProducer(); consumer2.stopConsumer(); producers3.stopProducer(); consumer3.stopConsumer(); consumer4.stopConsumer(); } }
輸出結(jié)果:
Producer[producer-1] storageProduct:Product-1 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-2] getProduct:Product-1 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-3 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-2 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-3] getProduct:Product-3 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-1] getProduct:Product-2 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-1] storageProduct:Product-4 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-4] getProduct:Product-4 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-6 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-5 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-1] getProduct:Product-6 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-2] getProduct:Product-5 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-1] storageProduct:Product-7 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-3] getProduct:Product-7 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-8 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-9 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-4] getProduct:Product-8 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-1] storageProduct:Product-10 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Producer[producer-3] storageProduct:Product-11 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-12 倉庫中還沒有被消費的產(chǎn)品數(shù)量:4 Consumer[consumer-1] getProduct:Product-9 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Consumer[consumer-2] getProduct:Product-10 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-3] getProduct:Product-11 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-3] storageProduct:Product-13 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Producer[producer-1] storageProduct:Product-14 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-15 倉庫中還沒有被消費的產(chǎn)品數(shù)量:4 Consumer[consumer-4] getProduct:Product-12 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Consumer[consumer-1] getProduct:Product-13 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-2] getProduct:Product-14 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-1] storageProduct:Product-16 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Producer[producer-3] storageProduct:Product-17 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-18 倉庫中還沒有被消費的產(chǎn)品數(shù)量:4
分析:在main方法中建立了一個產(chǎn)品倉庫,并未該倉庫關(guān)聯(lián)了3個生產(chǎn)者線程和4個消費者線程,啟動這些線程,使生產(chǎn) 者/消費者模型運作起來,當程序運行1600ms時,所有的生產(chǎn)者停止生產(chǎn)產(chǎn)品,消費者停止消費產(chǎn)品。
生產(chǎn)者線程Product在run方法中沒300ms便生產(chǎn)一個產(chǎn)品,并存入倉庫;消費者線程Consumer在run方法中沒500ms便從倉庫中取一個產(chǎn)品。
倉庫類Warehouse負責(zé)存放產(chǎn)品和發(fā)放產(chǎn)品。storageProduct方法負責(zé)存儲產(chǎn)品,當倉庫滿時,當前線程進入等待狀態(tài),即如果生產(chǎn)者線程A在調(diào)用storageProduct方法以存儲產(chǎn)品時,發(fā)現(xiàn)倉庫已滿,無法存儲時,便會進入等待狀態(tài)。當存儲產(chǎn)品成功時,調(diào)用notify方法,喚醒等待的消費者線程。
getProduct方法負責(zé)提前產(chǎn)品,當倉庫空時,當前線程進入等待狀態(tài),即如果消費者線程B在調(diào)用getProduct方法以獲取產(chǎn)品時,發(fā)現(xiàn)倉庫空了,便會進入等待狀態(tài)。當提取產(chǎn)品成功時,調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
以上這篇淺談java線程中生產(chǎn)者與消費者的問題就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
- Java多線程并發(fā)生產(chǎn)者消費者設(shè)計模式實例解析
- Java多線程生產(chǎn)者消費者模式實現(xiàn)過程解析
- Java多線程 生產(chǎn)者消費者模型實例詳解
- Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解
- Java多線程之線程通信生產(chǎn)者消費者模式及等待喚醒機制代碼詳解
- java 中多線程生產(chǎn)者消費者問題詳細介紹
- JAVA多線程實現(xiàn)生產(chǎn)者消費者的實例詳解
- java多線程解決生產(chǎn)者消費者問題
- JAVA生產(chǎn)者消費者(線程同步)代碼學(xué)習(xí)示例
- Java如何通過線程解決生產(chǎn)者/消費者問題
相關(guān)文章
Spring?boot?RedisTemplate?序列化服務(wù)化配置方式
這篇文章主要介紹了Springboot?RedisTemplate序列化服務(wù)化配置方式,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型方法步驟
Elasticsearch存儲數(shù)據(jù)之前需要先創(chuàng)建索引,類似于結(jié)構(gòu)型數(shù)據(jù)庫建庫建表,創(chuàng)建索引時定義了每個字段的索引方式和數(shù)據(jù)類型,這篇文章主要給大家介紹了關(guān)于ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型的方法步驟,需要的朋友可以參考下2023-09-09