1. 前言
本节内容主要是对 Java 锁机制之 Condition 接口进行讲解,Condition 接口是配合 Lock 接口使用的,我们已经学习过 Lock 接口的相关知识,那么接下来对 Condition 接口进行讲解。本节内容的知识点如下:
2. Condition 接口简介
定义:Condition 接口也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待 / 通知模式。Condition 可以看做是 Obejct 类的 wait ()、notify ()、notifyAll () 方法的替代品,与 Lock 配合使用。
public interface Condition {
void await() throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
等待机制方法简介:
- void await() throws InterruptedException:当前线程进入等待状态,直到被其它线程的唤醒继续执行或被中断;
- void awaitUninterruptibly():当前线程进入等待状态,直到被其它线程被唤醒;
- long awaitNanos(long nanosTimeout) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或被中断,或者指定的等待时间结束;nanosTimeout 为超时时间,返回值 = 超时时间 - 实际消耗时间;
- boolean await(long time, TimeUnit unit) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或被中断,或者指定的等待时间结束;与上个方法区别:可以自己设置时间单位,未超时被唤醒返回 true,超时则返回 false;
- boolean awaitUntil(Date deadline) throws InterruptedException:当前线程等待状态,直到被其他线程唤醒或被中断,或者指定的截止时间结束,截止时间结束前被唤醒,返回 true,否则返回 false。
通知机制方法简介:
- void signal():唤醒一个线程;
- void signalAll():唤醒所有线程。
2.1.Condition 对象的创建
Condition 对象是由 Lock 对象创建出来的 (Lock.newCondition),换句话说,Condition 是依赖 Lock 对象的。那么我们来看看如果创建 Condition 对象。
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
3.ReentrantLock 与 Condition 实现生产者与消费者
场景修改:
- 创建一个工厂类 ProductFactory,该类包含两个方法,produce 生产方法和 consume 消费方法(未改变);
- 对于 produce 方法,当没有库存或者库存达到 10 时,停止生产。为了更便于观察结果,每生产一个产品,sleep 3000 毫秒(5000 变 3000,调用地址也改变了,具体看代码);
- 对于 consume 方法,只要有库存就进行消费。为了更便于观察结果,每消费一个产品,sleep 5000 毫秒(sleep 调用地址改变了,具体看代码);
- 库存使用 LinkedList 进行实现,此时 LinkedList 即共享数据内存(未改变);
- 创建一个 Producer 生产者类,用于调用 ProductFactory 的 produce 方法。生产过程中,要对每个产品从 0 开始进行编号 (新增 sleep 3000ms);
- 创建一个 Consumer 消费者类,用于调用 ProductFactory 的 consume 方法 (新增 sleep 5000ms);
- 创建一个测试类,main 函数中创建 2 个生产者和 3 个消费者,运行程序进行结果观察(未改变)。
以下是所有代码。
public class DemoTest {
public static void main(String[] args) {
ProductFactory productFactory = new ProductFactory();
new Thread(new Producer(productFactory),"1号生产者"). start();
new Thread(new Producer(productFactory),"2号生产者"). start();
new Thread(new Consumer(productFactory),"1号消费者"). start();
new Thread(new Consumer(productFactory),"2号消费者"). start();
new Thread(new Consumer(productFactory),"3号消费者"). start();
}
}
class ProductFactory {
private LinkedList<String> products; //根据需求定义库存,用 LinkedList 实现
private int capacity = 10; // 根据需求:定义最大库存 10
private Lock lock = new ReentrantLock(false);
private Condition p = lock.newCondition();
private Condition c = lock.newCondition();
public ProductFactory() {
products = new LinkedList<String>();
}
// 根据需求:produce 方法创建
public void produce(String product) {
try {
lock.lock();
while (capacity == products.size()) { //根据需求:如果达到 10 库存,停止生产
try {
System.out.println("警告:线程("+Thread.currentThread().getName() + ")准备生产产品,但产品池已满");
p.await(); // 库存达到 10 ,生产线程进入 wait 状态
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(product); //如果没有到 10 库存,进行产品添加
System.out.println("线程("+Thread.currentThread().getName() + ")生产了一件产品:" + product+";当前剩余商品"+products.size()+"个");
c.signalAll(); //生产了产品,通知消费者线程从 wait 状态唤醒,进行消费
} finally {
lock.unlock();
}
}
// 根据需求:consume 方法创建
public String consume() {
try {
lock.lock();
while (products.size()==0) { //根据需求:没有库存消费者进入wait状态
try {
System.out.println("警告:线程("+Thread.currentThread().getName() + ")准备消费产品,但当前没有产品");
c.await(); //库存为 0 ,无法消费,进入 wait ,等待生产者线程唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String product = products.remove(0) ; //如果有库存则消费,并移除消费掉的产品
System.out.println("线程("+Thread.currentThread().getName() + ")消费了一件产品:" + product+";当前剩余商品"+products.size()+"个");
p.signalAll();// 通知生产者继续生产
return product;
} finally {
lock.unlock();
}
}
}
class Producer implements Runnable {
private ProductFactory productFactory; //关联工厂类,调用 produce 方法
public Producer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
int i = 0 ; // 根据需求,对产品进行编号
while (true) {
productFactory.produce(String.valueOf(i)); //根据需求 ,调用 productFactory 的 produce 方法
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}
class Consumer implements Runnable {
private ProductFactory productFactory;
public Consumer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
while (true) {
productFactory.consume();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上述代码结果如下图所示