1、前言
学习JUC,就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型,用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中,生产者生产数据放入共享的缓冲区中,消费者从缓冲区中取出数据进行消费。在这个过程中,生产者和消费者之间需要保持同步,以避免数据出现错误或重复。今天我们就来说说生产者消费者模型,以及JUC中如何解决该模型的同步问题。
2、什么是生产者消费者问题
生产者消费者问题是一种经典的多线程问题,用于描述生产者和消费者之间的数据交换问题。其实本质上就是线程间通信问题,即线程等待唤醒和通知唤醒。
生产者消费者问题通常包含以下三个元素:
- 生产者:负责生产数据,并将其放入共享的缓冲区中。
- 消费者:负责从缓冲区中取出数据,并进行消费。
- 缓冲区:用于存放生产者生产的数据,消费者从中取出数据进行消费。
在实际应用中,生产者和消费者可能存在速度差异,导致缓冲区的数据量不断变化。如果缓冲区满了,生产者需要等待,直到消费者取走了一部分数据。同样,如果缓冲区为空,消费者需要等待,直到生产者生产了一些数据放入缓冲区中。
3、Synchronized解决方案
synchronized解决方案,一般采用wait()(等待唤醒)和notifyAll()(通知唤醒)进行线程的同步通信。
- wait()方法用于使当前线程等待,直到另一个线程调用相同对象上的notify()方法或notifyAll()方法来唤醒它。wait()方法必须在synchronized块或方法中调用,以确保线程获得对象的监视器锁。
- notify()方法用于通知等待在相同对象上的某个线程,告诉它们可以继续运行。
- notifyAll()方法则通知等待在相同对象上的所有线程。
调用wait()方法会释放锁,使当前线程进入等待状态,直到其他线程调用相同对象上的notify()方法或notifyAll()方法唤醒它。而notify()方法则会随机选择一个等待的线程唤醒,而notifyAll()则会唤醒所有等待的线程,让它们竞争锁。
public class ProducerConsumerExample {
public static void main(String[] args) {
NumberOper object = new NumberOper();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
object.add();
}
}, "thread-add-1").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
object.sub();
}
}, "thread-sub-1").start();
}
}
class NumberOper {
private int number = 0;
public synchronized void add() {
if(number != 0) {
try {
// 等待唤醒
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number);
// 通知其他唤醒
this.notifyAll();
}
public synchronized void sub() {
if(number == 0) {
try {
// 等待唤醒
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number);
// 通知其他唤醒
this.notifyAll();
}
}
执行结果:
不过需要注意的是,上面的代码没有考虑到多线程并发的情况,如果多个生产者和多个消费者同时访问缓冲区,就需要使用线程安全的数据结构或加锁来保证线程安全。也就是虚假唤醒问题。
虚假唤醒问题,请参考《wait(),notify()虚假唤醒》篇幅。
4、Lock解决方案
Synchronized解决方案,主要是依赖于wait()和notify()方法解决。相应的JUC中的Lock也是类似的解决手段。
- Synchronized:(注意wait()和notify()方法是Object的方法)
- wait():线程等待,直到其他线程将他唤醒
- notify():唤醒其他等待的线程
- notifyAll():换新所有等待的线程
- Lock:
- await():线程等待,直到其他线程将他唤醒
- signal():唤醒正在等待的线程
- signalAll():唤醒正在等待的线程
使用JUC Lock来解决生产者消费者问题,可以使用Condition(条件变量)来实现。
Condition是基于Lock来创建的,每个Condition对象都和一个Lock对象绑定。Condition对象提供了类似wait()和notify()的方法来控制线程的等待和唤醒。Condition对象可以通过Lock对象的newCondition()方法创建。
生产者消费者问题中,我们可以使用两个Condition对象来控制生产者和消费者的等待和唤醒。当缓冲区为空时,消费者线程等待,当缓冲区满时,生产者线程等待。
package com.github.fastdev.waitnotify;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * @author Shamee loop * @date 2023/4/9 */
public class ProducerConsumerExample {
public static void main(String[] args) {
NumberOper object = new NumberOper();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
object.add();
}
}, "thread-add-1").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
object.sub();
}
}, "thread-sub-1").start();
}
}
class NumberOper {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void add() {
lock.lock();
try {
if (number != 0) {
condition.await();
}
number++;
System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void sub() {
lock.lock();
try {
if (number == 0) {
condition.await();
}
number--;
System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
执行结果:
5、Condition
那么既然synchronized就能解决生产者消费者问题,为什么还需要JUC的Lock这种方式呢? 从代码量上看,Lock的方式明显比较繁琐。
当然,存在即合理。JUC实现了Lock的方式,且引入了Condition。肯定是具备了synchronized所没有的特性。
试想一个场景:
synchronized的notify()虽然唤醒了等待的线程。但是如果存在多个等待的线程呢?唤醒后获得执行权的需要取决于分配策略。那么有没有一种可能,我需要指定唤醒某个等待的线程?Condition就来了,他可以指定唤醒某个线程,也就是精准唤醒。
Condition 是 Java 中 Lock 的一个重要组件,可以用于实现更加灵活、高效的线程同步。它提供了类似于 Object.wait() 和 Object.notify() 的等待/通知机制,但相较于传统的 synchronized,它更加灵活,可以实现更多高级特性。
Condition 的主要作用是允许线程在等待某些条件的情况下暂停执行(即阻塞线程),并且当条件满足时,可以重新唤醒这些线程。Condition 与 Lock 一起使用,通常需要创建一个 Lock 对象,然后调用 Lock 的 newCondition() 方法来创建一个 Condition 对象。
Condition 接口中最常用的方法包括:
- await():当前线程等待,直到被通知或中断;
- awaitUninterruptibly():当前线程等待,直到被通知,但不会响应中断;
- signal():唤醒一个等待中的线程;
- signalAll():唤醒所有等待中的线程。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public void produce() throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await();
}
int num = (int) (Math.random() * 100);
queue.add(num);
System.out.println("Produced " + num);
// 指定唤醒线程
notEmpty.signal();
} finally {
lock.unlock();
}
}
public void consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
int num = queue.remove();
System.out.println("Consumed " + num);
// 指定唤醒线程
notFull.signal();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerExample example = new ProducerConsumerExample();
Thread producerThread1 = new Thread(() -> {
while (true) {
try {
example.produce();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread producerThread2 = new Thread(() -> {
while (true) {
try {
example.produce();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumerThread1 = new Thread(() -> {
while (true) {
try {
example.consume();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumerThread2 = new Thread(() -> {
while (true) {
try {
example.consume();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread1.start();
producerThread2.start();
consumerThread1.start();
consumerThread2.start();
}
}
6、小结
到此,我们学习了生产者和消费者模型,以及他的一些问题,以及如何解决。还接触了Locks中的另一个类Condition的使用。一天进步一点点,一起加油~