淺談java線程中生產(chǎn)者與消費者的問題
一、概念
生產(chǎn)者與消費者問題是一個金典的多線程協(xié)作的問題.生產(chǎn)者負(fù)責(zé)生產(chǎn)產(chǎn)品,并將產(chǎn)品存放到倉庫;消費者從倉庫中獲取產(chǎn)品并消費。當(dāng)倉庫滿時,生產(chǎn)者必須停止生產(chǎn),直到倉庫有位置存放產(chǎn)品;當(dāng)倉庫空時,消費者必須停止消費,直到倉庫中有產(chǎn)品。
解決生產(chǎn)者/消費者問題主要用到如下幾個技術(shù):1.用線程模擬生產(chǎn)者,在run方法中不斷地往倉庫中存放產(chǎn)品。2.用線程模擬消費者,在run方法中不斷地從倉庫中獲取產(chǎn)品。3
. 倉庫類保存產(chǎn)品,當(dāng)產(chǎn)品數(shù)量為0時,調(diào)用wait方法,使得當(dāng)前消費者線程進(jìn)入等待狀態(tài),當(dāng)有新產(chǎn)品存入時,調(diào)用notify方法,喚醒等待的消費者線程。當(dāng)倉庫滿時,調(diào)用wait方法,使得當(dāng)前生產(chǎn)者線程進(jìn)入等待狀態(tài),當(dāng)有消費者獲取產(chǎn)品時,調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
二、實例
package book.thread.product;
public class Consumer extends Thread{
private Warehouse warehouse;//消費者獲取產(chǎn)品的倉庫
private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位
public Consumer(Warehouse warehouse,String name){
super(name);
this.warehouse = warehouse;
}
public void start(){
this.running = true;
super.start();
}
public void run(){
Product product;
try {
while(running){
//從倉庫中獲取產(chǎn)品
product = warehouse.getProduct();
sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//停止消費者線程
public void stopConsumer(){
synchronized(warehouse){
this.running = false;
warehouse.notifyAll();//通知等待倉庫的線程
}
}
//消費者線程是否在運行
public boolean isRunning(){
return running;
}
}
package book.thread.product;
public class Producer extends Thread{
private Warehouse warehouse;//生產(chǎn)者存儲產(chǎn)品的倉庫
private static int produceName = 0;//產(chǎn)品的名字
private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位
public Producer(Warehouse warehouse,String name){
super(name);
this.warehouse = warehouse;
}
public void start(){
this.running = true;
super.start();
}
public void run(){
Product product;
//生產(chǎn)并存儲產(chǎn)品
try {
while(running){
product = new Product((++produceName)+"");
this.warehouse.storageProduct(product);
sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//停止生產(chǎn)者線程
public void stopProducer(){
synchronized(warehouse){
this.running = false;
//通知等待倉庫的線程
warehouse.notifyAll();
}
}
//生產(chǎn)者線程是否在運行
public boolean isRunning(){
return running;
}
}
package book.thread.product;
public class Product {
private String name;//產(chǎn)品名
public Product(String name){
this.name = name;
}
public String toString(){
return "Product-"+name;
}
}
package book.thread.product;
//產(chǎn)品的倉庫類,內(nèi)部采用數(shù)組來表示循環(huán)隊列,以存放產(chǎn)品
public class Warehouse {
private static int CAPACITY = 11;//倉庫的容量
private Product[] products;//倉庫里的產(chǎn)品
//[front,rear]區(qū)間的產(chǎn)品未被消費
private int front = 0;//當(dāng)前倉庫中第一個未被消費的產(chǎn)品的下標(biāo)
private int rear = 0;//倉庫中最后一個未被消費的產(chǎn)品下標(biāo)加1
public Warehouse(){
this.products = new Product[CAPACITY];
}
public Warehouse(int capacity){
this();
if(capacity > 0){
CAPACITY = capacity +1;
this.products = new Product[CAPACITY];
}
}
//從倉庫獲取一個產(chǎn)品
public Product getProduct() throws InterruptedException{
synchronized(this){
boolean consumerRunning = true;//標(biāo)志消費者線程是否還在運行
Thread currentThread = Thread.currentThread();//獲取當(dāng)前線程
if(currentThread instanceof Consumer){
consumerRunning = ((Consumer)currentThread).isRunning();
}else{
return null;//非消費者不能獲取產(chǎn)品
}
//若消費者線程在運行中,但倉庫中沒有產(chǎn)品了,則消費者線程繼續(xù)等待
while((front==rear) && consumerRunning){
wait();
consumerRunning = ((Consumer)currentThread).isRunning();
}
//如果消費者線程已經(jīng)停止運行,則退出該方法,取消獲取產(chǎn)品
if(!consumerRunning){
return null;
}
//獲取當(dāng)前未被消費的第一個產(chǎn)品
Product product = products[front];
System.out.println("Consumer[" + currentThread.getName()+"] getProduct:"+product);
//將當(dāng)前未被消費產(chǎn)品的下標(biāo)后移一位,如果到了數(shù)組末尾,則移動到首部
front = (front+1+CAPACITY)%CAPACITY;
System.out.println("倉庫中還沒有被消費的產(chǎn)品數(shù)量:"+(rear+CAPACITY-front)%CAPACITY);
//通知其他等待線程
notify();
return product;
}
}
//向倉庫存儲一個產(chǎn)品
public void storageProduct(Product product) throws InterruptedException{
synchronized(this){
boolean producerRunning = true;//標(biāo)志生產(chǎn)者線程是否在運行
Thread currentThread = Thread.currentThread();
if(currentThread instanceof Producer){
producerRunning = ((Producer)currentThread).isRunning();
}else{
return;
}
//如果最后一個未被消費的產(chǎn)品與第一個未被消費的產(chǎn)品的下標(biāo)緊挨著,則說明沒有存儲空間了。
//如果沒有存儲空間了,而生產(chǎn)者線程還在運行,則生產(chǎn)者線程等待倉庫釋放產(chǎn)品
while(((rear+1)%CAPACITY == front) && producerRunning){
wait();
producerRunning = ((Producer)currentThread).isRunning();
}
//如果生產(chǎn)線程已經(jīng)停止運行了,則停止產(chǎn)品的存儲
if(!producerRunning){
return;
}
//保存產(chǎn)品到倉庫
products[rear] = product;
System.out.println("Producer[" + Thread.currentThread().getName()+"] storageProduct:" + product);
//將rear下標(biāo)循環(huán)后移一位
rear = (rear + 1)%CAPACITY;
System.out.println("倉庫中還沒有被消費的產(chǎn)品數(shù)量:"+(rear + CAPACITY -front)%CAPACITY);
notify();
}
}
}
package book.thread.product;
public class TestProduct {
public static void main(String[] args) {
Warehouse warehouse = new Warehouse(10);//建立一個倉庫,容量為10
//建立生產(chǎn)者線程和消費者
Producer producers1 = new Producer(warehouse,"producer-1");
Producer producers2 = new Producer(warehouse,"producer-2");
Producer producers3 = new Producer(warehouse,"producer-3");
Consumer consumer1 = new Consumer(warehouse,"consumer-1");
Consumer consumer2 = new Consumer(warehouse,"consumer-2");
Consumer consumer3 = new Consumer(warehouse,"consumer-3");
Consumer consumer4 = new Consumer(warehouse,"consumer-4");
//啟動生產(chǎn)者線程和消費者線程
producers1.start();
producers2.start();
consumer1.start();
producers3.start();
consumer2.start();
consumer3.start();
consumer4.start();
//讓生產(chǎn)者/消費者程序運行1600ms
try {
Thread.sleep(1600);
} catch (InterruptedException e) {
e.printStackTrace();
}
//停止消費者線程
producers1.stopProducer();
consumer1.stopConsumer();
producers2.stopProducer();
consumer2.stopConsumer();
producers3.stopProducer();
consumer3.stopConsumer();
consumer4.stopConsumer();
}
}
輸出結(jié)果:
Producer[producer-1] storageProduct:Product-1 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-2] getProduct:Product-1 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-3 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-2 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-3] getProduct:Product-3 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-1] getProduct:Product-2 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-1] storageProduct:Product-4 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-4] getProduct:Product-4 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-6 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-5 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-1] getProduct:Product-6 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-2] getProduct:Product-5 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-1] storageProduct:Product-7 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Consumer[consumer-3] getProduct:Product-7 倉庫中還沒有被消費的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-8 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-9 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-4] getProduct:Product-8 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-1] storageProduct:Product-10 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Producer[producer-3] storageProduct:Product-11 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-12 倉庫中還沒有被消費的產(chǎn)品數(shù)量:4 Consumer[consumer-1] getProduct:Product-9 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Consumer[consumer-2] getProduct:Product-10 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-3] getProduct:Product-11 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-3] storageProduct:Product-13 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Producer[producer-1] storageProduct:Product-14 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-15 倉庫中還沒有被消費的產(chǎn)品數(shù)量:4 Consumer[consumer-4] getProduct:Product-12 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Consumer[consumer-1] getProduct:Product-13 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Consumer[consumer-2] getProduct:Product-14 倉庫中還沒有被消費的產(chǎn)品數(shù)量:1 Producer[producer-1] storageProduct:Product-16 倉庫中還沒有被消費的產(chǎn)品數(shù)量:2 Producer[producer-3] storageProduct:Product-17 倉庫中還沒有被消費的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-18 倉庫中還沒有被消費的產(chǎn)品數(shù)量:4
分析:在main方法中建立了一個產(chǎn)品倉庫,并未該倉庫關(guān)聯(lián)了3個生產(chǎn)者線程和4個消費者線程,啟動這些線程,使生產(chǎn) 者/消費者模型運作起來,當(dāng)程序運行1600ms時,所有的生產(chǎn)者停止生產(chǎn)產(chǎn)品,消費者停止消費產(chǎn)品。
生產(chǎn)者線程Product在run方法中沒300ms便生產(chǎn)一個產(chǎn)品,并存入倉庫;消費者線程Consumer在run方法中沒500ms便從倉庫中取一個產(chǎn)品。
倉庫類Warehouse負(fù)責(zé)存放產(chǎn)品和發(fā)放產(chǎn)品。storageProduct方法負(fù)責(zé)存儲產(chǎn)品,當(dāng)倉庫滿時,當(dāng)前線程進(jìn)入等待狀態(tài),即如果生產(chǎn)者線程A在調(diào)用storageProduct方法以存儲產(chǎn)品時,發(fā)現(xiàn)倉庫已滿,無法存儲時,便會進(jìn)入等待狀態(tài)。當(dāng)存儲產(chǎn)品成功時,調(diào)用notify方法,喚醒等待的消費者線程。
getProduct方法負(fù)責(zé)提前產(chǎn)品,當(dāng)倉庫空時,當(dāng)前線程進(jìn)入等待狀態(tài),即如果消費者線程B在調(diào)用getProduct方法以獲取產(chǎn)品時,發(fā)現(xiàn)倉庫空了,便會進(jìn)入等待狀態(tài)。當(dāng)提取產(chǎn)品成功時,調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
以上這篇淺談java線程中生產(chǎn)者與消費者的問題就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
- Java多線程并發(fā)生產(chǎn)者消費者設(shè)計模式實例解析
- Java多線程生產(chǎn)者消費者模式實現(xiàn)過程解析
- Java多線程 生產(chǎn)者消費者模型實例詳解
- Java多線程 BlockingQueue實現(xiàn)生產(chǎn)者消費者模型詳解
- Java多線程之線程通信生產(chǎn)者消費者模式及等待喚醒機(jī)制代碼詳解
- java 中多線程生產(chǎn)者消費者問題詳細(xì)介紹
- JAVA多線程實現(xiàn)生產(chǎn)者消費者的實例詳解
- java多線程解決生產(chǎn)者消費者問題
- JAVA生產(chǎn)者消費者(線程同步)代碼學(xué)習(xí)示例
- Java如何通過線程解決生產(chǎn)者/消費者問題
相關(guān)文章
Spring?boot?RedisTemplate?序列化服務(wù)化配置方式
這篇文章主要介紹了Springboot?RedisTemplate序列化服務(wù)化配置方式,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07
ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型方法步驟
Elasticsearch存儲數(shù)據(jù)之前需要先創(chuàng)建索引,類似于結(jié)構(gòu)型數(shù)據(jù)庫建庫建表,創(chuàng)建索引時定義了每個字段的索引方式和數(shù)據(jù)類型,這篇文章主要給大家介紹了關(guān)于ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型的方法步驟,需要的朋友可以參考下2023-09-09

