一、什么是生产者/消费者模型
生产者-消费者模型(Producer-Consumer problem)是一个非常经典的多线程并发协作的模型。
比如某个模块负责生产数据,而另一个模块负责处理数据。产生数据的模块就形象地被称为生产者;而处理数据的模块,则被称为消费者。
生产者和消费者在同一段时间内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
如图所示:
二、生产者-消费者模式的优点
1、解耦
由于有缓冲区的存在,生产者和消费者之间不直接依赖,耦合度降低。
2、支持并发
由于生产者与消费者是两个独立的并发体,它们之间是通过缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区中拿数据接口,这样就不会因为彼此的处理速度而发生阻塞。(通过使用多个生产者和消费者线程,可以实现并发处理,提高系统的吞吐量和响应性)
3、支持忙闲不均
缓冲区还有另一个好处:当数据生产过快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等消费者处理掉其他数据时,再从缓存区中取数据来处理。(通过使用缓冲区可以平衡生产者与消费者之间的速度差异,以及处理能力的不匹配)
三、生产者-消费者模式所遵循的规则
- 生产者仅仅在缓冲区未满时生产,缓冲区满则停止生产。
- 消费者仅仅在缓冲区有产品时才能消费,缓冲区为空则停止消费。
- 当消费者发现缓冲区没有可消费的产品时会通知生产者。
- 当生产者生产出可消费的产品时,应该通知等待的消费者去消费。
四、生产者-消费者模型的实现
1、通过阻塞队列方式实现
public class ProducerConsumerDemo1 {
/**
* 缓冲队列
*/
private final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
/**
* 生产者
*/
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.offer(i);
}
}
}
/**
* 消费者
*/
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.poll();
}
}
}
public Producer getProducer() {
return new Producer();
}
public Consumer getConsumer() {
return new Consumer();
}
public static void main(String[] args) {
ProducerConsumerDemo1 producerConsumerDemo1 = new ProducerConsumerDemo1();
new Thread(producerConsumerDemo1.getProducer()).start();
new Thread(producerConsumerDemo1.getConsumer()).start();
}
}
2、通过wait和notifyAll来实现
/**
* 缓冲区(仓库)
*/
private final List<Object> list = new ArrayList<>();
private final int bufferCount = 10;
public final Object lock = new Object();
/**
* 生产者
*/
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (list.size() >= bufferCount) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//仓库未满,继续生产产品
list.add(new Object());
//唤醒消费者去消费产品
lock.notifyAll();
}
}
}
}
/**
* 消费者
*/
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (list.size() == 0) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//仓库不是空,继续消费
list.remove(0);
//唤醒生产者去生产产品
lock.notifyAll();
}
}
}
}
public Producer getProducer(){
return new Producer();
}
public Consumer getConsumer(){
return new Consumer();
}
public static void main(String[] args) {
ProducerConsumerDemo2 producerConsumerDemo2=new ProducerConsumerDemo2();
new Thread(producerConsumerDemo2.getProducer()).start();
new Thread(producerConsumerDemo2.getConsumer()).start();
}
}
3、通过ReentrantLock和Condition来实现
public class ProducerConsumerDemo3 {
/**
* 缓冲区(仓库)
*/
private final List<Object> list = new ArrayList<>();
/**
* 缓冲区大小
*/
private final int bufferCount = 10;
public ReentrantLock lock = new ReentrantLock();
//创建两个条件变量
private final Condition condition1 = lock.newCondition();
private final Condition condition2 = lock.newCondition();
/**
* 生产者
*/
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);//模拟生产操作
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
lock.lock();
while (list.size() >= bufferCount) {
condition1.await();//当仓库数据数量超过缓冲区设定的最大数量,则让生产线程进入等待状态
}
list.add(new Object());
System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + list.size());
condition2.signal();//唤醒消费线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
lock.lock();
while (list.size() == 0) {
condition2.await();//当仓库中数据为空时,则让消费线程进入等待状态
}
list.remove(0);
System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:" + list.size());
condition1.signal();//唤醒生产线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public Producer getProducer() {
return new Producer();
}
public Consumer getConsumer() {
return new Consumer();
}
public static void main(String[] args) {
ProducerConsumerDemo3 producerConsumerDemo3 = new ProducerConsumerDemo3();
new Thread(producerConsumerDemo3.getProducer()).start();
new Thread(producerConsumerDemo3.getConsumer()).start();
}
}
五、使用两个线程轮流打印字符串A和字符串B(A和B交替打印,各打印10次。)
1、通过wait()和notifyAll()实现:
public class PrintDemo2 {
private int num = 10;
private boolean flag;
private final Object obj = new Object();
public static void main(String[] args) {
PrintDemo2 printDemo2 = new PrintDemo2();
printDemo2.printMethod();
}
public void printMethod() {
Thread thread1 = new Thread("Thread One") {
@Override
public void run() {
super.run();
for (int i = 0; i < num; i++) {
synchronized (obj) {
while (flag) {
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "=========A");
flag = true;
obj.notifyAll();
}
}
}
};
Thread thread2 = new Thread("Thread Two") {
@Override
public void run() {
super.run();
for (int i = 0; i < num; i++) {
synchronized (obj) {
while (!flag) {
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "=========B");
flag = false;
obj.notifyAll();
}
}
}
};
thread1.start();
thread2.start();
}
}
打印结果如下:
小结:关键字synchronized与wait()、notify()/notifyAll()方法相结合可以实现wait/notify模式,ReentrantLock同样可以实现同样的功能,但需要借助于Condition对象。 下面我们来看一下使用ReentrantLock+Condition来实现的方式。
2、方式二:使用ReentrantLock和Condition实现:
public class PrintDemo {
private int num = 10;
private boolean flag;
public static void main(String[] args) {
PrintDemo printDemo = new PrintDemo();
printDemo.printMethod();
}
public void printMethod() {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread thread1 = new Thread("Thread One") {
@Override
public void run() {
super.run();
for (int i = 0; i < num; i++) {
try {
lock.lock();
while (flag) {
condition.await();
}
System.out.println(Thread.currentThread().getName() + "=========A");
flag = true;
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
};
Thread thread2 = new Thread("Thread Two") {
@Override
public void run() {
super.run();
for (int i = 0; i < num; i++) {
try {
lock.lock();
while (!flag) {
condition.await();
}
System.out.println(Thread.currentThread().getName() + "=========B");
flag = false;
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
};
thread1.start();
thread2.start();
}
}
打印结果如下:
六、等待通知机制
是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。
- wait():表示让当前线程进入等待状态,并且释放对象上的锁及CPU资源。然后无限期等待,直到被唤醒为止(注意:调用wait()方法后会释放对象的锁)
- nofity():通知一个正在等待的线程(是从等待中的线程中随机通知一个),使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入WAITING状态。
- notifyAll():与notify方法相同,唯一不同的是,notifyAll是通知所有等待的线程,而notify是随机地通知等待中的线程中的一个。
- wait(long):超时等待一段时间,这里的参数时间是毫秒,也就是等待n毫秒。线程将等待指定的时间长度后自动苏醒,或者直到其他线程通过调用相同对象上的notify()或者notifyAll()来唤醒它。如果超过了指定时间没有被唤醒,则线程会自动苏醒。
- wait(long ,int):同上述wait(long),只是对于超时的时间更细粒度的控制,可以达到纳秒。
注意:在调用wait()、notify()、notifyAll()方法之前,线程必须要获得对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法、notify()、notifyAll()方法。
1、等待和通知的标准范式
等待方遵循如下原则:
(1)获取对象的锁
(2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件
(3)条件满足则执行对应的逻辑
通知方遵循如下原则:
(1)获得对象的锁
(2)改变条件
(3)通知所有等待在对象上的线程。
2、notify和notifyAll应该用哪一个
尽可能用notifyAll(),谨慎用notify(),因为notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。
3、为什么wait方法必须在synchronized保护的同步代码块中使用?
先来看看wait方法的源码注释:
“wait method should always be used in a loop:
synchronized (obj) {
while (condition does not hold)
obj.wait();
... // Perform action appropriate to condition
}
This method should only be called by a thread tah is the owner of this object's monitors"
意思就是说,在使用wait方法时,必须把wait方法写在synchronized保护的while代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行wait方法,而在执行wait方法之前,必须先持有锁对象的monitor对象。
4、wait/notify 和 sleep方法的异同?
相同点:
(1)它们都可以让线程阻塞
(2)它们都可以响应interrupt中断:在执行的过程中,如果收到中断信号都可以响应,并抛出InterruptedException异常
不同点:
(1)wait方法必须在synchronized保护的代码中使用,而sleep方法并没有这个要求
(2)在同步代码中执行sleep方法时,并不会释放monitor锁,但执行wait方法时会主动释放monitor锁
(3)sleep方法中要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的wait方法而言,意味着永久等待,直到被中断或唤醒才能恢复,它并不会主动恢复。
(4)wait/notify 是Object 类的方法,而sleep是Thread类的方法。
5、Java多线程中wait()为什么要放在while循环中
将wait()方法放在while循环中是为了避免虚假唤醒(spurious wakeups)。
虚假唤醒是指当一个线程被唤醒时,尽管没有收到对应的通知信号,但它还是会继续执行。这种情况可能发生在某些特定的条件下,例如操作系统级别的中断、调度器的行为等。
为了防止虚假唤醒导致逻辑或数据不一致性问题,在使用wait()方法进行线程等待时,我们通常会将其放在一个while循环内,并且与需要满足的条件进行检查。只有当满足特定的条件才会继续执行后面的代码。如果不满足条件,则线程会再次进入等待状态。以下是一个示例:
synchronized (lock) {
while (!condition) {
lock.wait();
}
// 执行其他操作
}
通过使用while(condition)来检查条件是否满足,即使发生虚假唤醒也能够安全地重新检查和等待。因此,在多线程编程中建议将wait()方法置于while循环内部以保证正确性。
6、为什么wait/notify/notifyAll 被定义在Object类中,而sleep被定义在Thread类中?
wait(),notify()和notifyAll()方法被定义在Object类中而不是Thread类中的原因是因为它们操作的是对象的锁(monitor)。
这些方法用于实现线程之间的协作与通信机制,即等待其他线程满足特定条件后再继续执行。由于每个对象都有一个关联的锁,因此将这些方法定义在所有对象共享的父类Object中更加合适。通过调用这些方法来控制同步代码块或同步方法内部对锁资源进行管理。
另一方面,sleep()方法并定义在Thread类中,它使当前正在执行的线程暂停指定时间段,并且不会释放持有的任何锁。该方法属于线程级别的操作,在调用时直接影响当前运行状态下的线程自身。
总结起来:
wait(),notify()和notifyAll()是基于对象级别的操作,用于多个线程之间进行协作和通信。
由于每个Java对象都具备监视器(monitor)功能(即互斥锁),所以这些方法需要定义在所有Java对象共享的父类Object中。
而sleep()方法则是一个与当前正在运行状态下的Thread相关联,独立存在并影响其自身行为状态。
7、ReentrantLock和Condition
Condition类是JDK5的技术,具有更好的灵活性,例如,可以实现多路通知功能,也就是在一个Lock对象中可以创建多个Condition实例,线程对象注册在指定的Condition中,从而可以有选择性地进行线程通知,在调度线程上更加灵活。
在使用notify()/notifyAll()方法进行通知时,被通知的线程由JVM进行选择,而方法notifyAll()会通知所有的waiting线程,没有选择权,会出现相当大的效率问题,但使用ReentrantLock结合Condition类可以实现”选择性通知“,这个功能是Condition类默认提供的。
Condition对象的作用是控制并处理线程的状态,它可以使线程呈现wait状态,也可以使线程继续运行。
Condition的await()方法的作用是使当前线程在接到通知或被中断之前一直处于等待wait状态。它和wait()方法的作用一样。
condition.await()方法调用之前必须先调用lock.lock()方法获得锁,否则会抛出java.lang.IIIleagalMonitorStateException:current thread is not owner。如下所示:
public class PrintDemo3 {
public static void main(String[] args) {
PrintDemo3 printDemo3=new PrintDemo3();
printDemo3.printMethod();
}
public void printMethod() {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread thread1 = new Thread() {
@Override
public void run() {
super.run();
try {
System.out.println("MM");
condition.wait();
System.out.println("NN");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
thread1.start();
}
}
控制台的输出:
- Object类中的wait()方法相当于Condition类中的await()方法。
- Object类中的wait(long timeout)方法相当于Condition类中的await(long time,TimeUnit unit)方法。
- Object类中的notify()方法相当于Condition类中的signal()方法。
- Object类中的notifyAll()方法相当于Condition类中的signalAll()方法。