生产者与消费者模型介绍
定义: 生产者消费者模式是一个十分经典的多线程并发协作的模式。
意义:弄懂生产者消费者问题能够让我们对并发编程的理解加深。
介绍:所谓生产者 - 消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域。
共享的数据区域就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为。而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。
生产者与消费者实现
场景设计:
- 创建一个工厂类 ProductFactory,该类包含两个方法,produce 生产方法和 consume 消费方法;
- 对于 produce 方法,当没有库存或者库存达到 10 时,停止生产。为了更便于观察结果,每生产一个产品,sleep 3秒;
- 对于 consume 方法,只要有库存就进行消费。为了更便于观察结果,每消费一个产品,sleep 3秒;
- 库存使用 LinkedList 进行实现,此时 LinkedList 即共享数据内存;
- 创建一个 Producer 生产者类,用于调用 ProductFactory 的 produce 方法。生产过程中,要对每个产品从 0 开始进行编号;
- 创建一个 Consumer 消费者类,用于调用 ProductFactory 的 consume 方法;
- 创建一个测试类,main 函数中创建 2 个生产者和 3 个消费者,运行程序进行结果观察。
ProductFactory.class
package jvm.juc.proAndcom;
import java.util.LinkedList;
public class ProductFactory {
private volatile LinkedList<String> products; //根据需求定义库存,用 LinkedList 实现
private int capacity = 10; // 根据需求:定义最大库存 10
public ProductFactory() {
products = new LinkedList<String>();
}
// 根据需求:produce 方法创建
public synchronized void produce(String product) {
while (capacity == products.size()) { //根据需求:如果达到 10 库存,停止生产
try {
System.out.println("警告:线程("+Thread.currentThread().getName() + ")准备生产产品,但产品池已满");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(product); //如果没有到 10 库存,进行产品添加
try {
Thread.sleep(1000); //根据需求为了便于观察结果,每生产一个产品,sleep 5000 ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程("+Thread.currentThread().getName() + ")生产了一件产品:" + product+";当前剩余商品"+products.size()+"个");
notify(); //生产了产品,通知消费者线程从 wait 状态唤醒,进行消费
}
// 根据需求:consume 方法创建
public synchronized String consume() {
while (products.size()==0) { //根据需求:没有库存消费者进入wait状态
try {
System.out.println("警告:线程("+Thread.currentThread().getName() + ")准备消费产品,但当前没有产品");
wait(); //库存为 0 ,无法消费,进入 wait ,等待生产者线程唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String product = products.remove(0) ; //如果有库存则消费,并移除消费掉的产品
try {
Thread.sleep(1000);//根据需求为了便于观察结果,每消费一个产品,sleep 5000 ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程("+Thread.currentThread().getName() + ")消费了一件产品:" + product+";当前剩余商品"+products.size()+"个");
notify();// 通知生产者继续生产
return product;
}
}
Consumer.Class
package jvm.juc.proAndcom;
public class Consumer implements Runnable{
private ProductFactory factory;
public Consumer(ProductFactory productFactory) {
factory = productFactory;
}
@Override
public void run() {
while (true) {
String consume = factory.consume();
}
}
}
Producer.class
package jvm.juc.proAndcom;
public class Producer implements Runnable{
private ProductFactory factory;
public Producer(ProductFactory productFactory) {
this.factory = productFactory;
}
@Override
public void run() {
int i = 0;
while (true) {
factory.produce(String.valueOf(i));
i++;
}
}
}
Test.class
package jvm.juc.proAndcom;
import internet.c1.Factory;
public class Test extends Thread{
public static void main(String[] args) {
ProductFactory factory = new ProductFactory();
new Thread(new Producer(factory),"一号生产者").start();
new Thread(new Producer(factory),"二号生产者").start();
new Thread(new Consumer(factory),"一号消费者").start();
new Thread(new Consumer(factory),"二号消费者").start();
new Thread(new Consumer(factory),"三号消费者").start();
}
}