系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者进程每次从缓冲区中取出一个产品并使用。(注:这里的“产品”理解为某种数据)
生产者、消费者共享一个初始为空、大小为 n 的缓冲区。
只有缓冲区没满时,生产者才能把产品放入缓冲区,否则必须等待。
- 同步关系。缓冲区满时,生产者要等待消费者取走产品
只有缓冲区不空时,消费者才能从中取出产品,否则必须等待。
- 同步关系。缓冲区为空时,消费者要等待生产者放入产品
缓冲区是临界资源,各进程必须互斥地访问
- 若其他进程并发地向缓冲区放入数据,会导致之前的数据会被覆盖
如何用信号量机制(P
、V
操作)实现生产者、消费者进程的这些功能呢?
信号量机制可实现互斥、同步、对一类系统资源的申请和释放。
-
设置初值为 1 1 1 的互斥信号量
-
设置初值为 0 0 0 的同步信号量(实现 “一前一后”)
-
设置一个信号量,初始值即为资源的数量(本质上也属于“同步问题”,若无空闲资源,则申请资源的进程需要等待别的进程释放资源后才能继续往下执行)
1、PV 操作题目分析步骤
-
关系分析。找出题目中描述的各个进程,分析它们之间的同步
-
整理思路。根据各进程的操作流程确定
P
、V
操作的大致顺序。 -
设置信号量。设置需要的信号量,并根据题目条件确定信号量初值。
(互斥信号量初值一般为 1,同步信号量的初始值要看对应资源的初始值是多少)
生产者每次要消耗(P)一个空闲缓冲区,并生产(V)一个产品
消费者每次要消耗(P)一个产品,并且释放一个空闲缓冲区(V)
往缓冲区中放入/取走产品需要互斥
2、能否改变相邻 P、V 操作的顺序?
若此时缓冲区内已经放满产品,则 empty=0
,full=n
。
则生产者进程执行 ① 使 mutex
变为 0 ,再执行 ② ,由于已没有空闲缓冲区,因此生产者被阻塞。由于生产者阻塞,因此切换回消费者进程。消费者进程执行 ③ ,由于 mutex
为 0,即生产者还没释放对临界资源的“锁”,因此消费者也被阻塞。
这就造成了生产者等待消费者释放空闲缓冲区,而消费者又等待生产者释放临界区的情况,生产者和消费者循环等待被对方唤醒,出现 “死锁”。
同样的,若缓冲区中没有产品,即 full=0
,empty=n
。按③④①的顺序执行就会发生死锁。
因此, 实现互斥的 P 操作一定要在实现同步的 P 操作之后 \color{red}实现互斥的\texttt{P}操作一定要在实现同步的\texttt{P}操作之后 实现互斥的P操作一定要在实现同步的P操作之后。
V
操作不会导致进程阻塞,因此
两个V操作顺序可以交换
\color{red}两个\text{V}操作顺序可以交换
两个V操作顺序可以交换
若放到 PV
操作之间会导致并发度降低,消费者花费更多的时间去使用产品,此时还没有释放临界资源,会导致生产者不断地等待消费者。
- 例如:业务代码放入核心代码里
3、Java 案例
import java.util.Deque;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ProducerConsumerByLock {
//互斥资源
private static Deque<Integer> queue = new LinkedList<>();
//最大容量
private static int maxSize = 5;
private static Lock lock = new ReentrantLock(false);
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();
public static void main(String[] args) throws ExecutionException, InterruptedException {
for (int i = 0; i < 3; i++) {
CompletableFuture.runAsync(new Producer());
}
for (int i = 0; i < 3; i++) {
CompletableFuture.runAsync(new Consumer());
}
//CompletableFuture.runAsync(new Consumer()); 产生的进程默认是守护进程
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产者
public static class Producer implements Runnable {
public void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == maxSize) {
System.out.println("产品已满, 等待消费者进行消费, 当前生产者: " + Thread.currentThread().getName());
//阻塞生产者, 并释放当前线程的锁
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产产品
queue.offerLast(1);
//日志打印, 业务逻辑尽量放在锁外面 , 这里只是为了顺序打印到控制台
System.out.println("当前生产者生产产品 " + 1 + ", 当前生产者: " + Thread.currentThread().getName());
//唤醒其他所有的消费者与生产者
empty.signalAll();
full.signalAll();//并发, 让其他生产者也参与竞争
} finally {
lock.unlock();
}
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void run() {
produce();
}
}
//消费者
public static class Consumer implements Runnable {
//消费产品
public void consume() {
while (true) {
int x;
lock.lock();
try {
while (0 == queue.size()) {
System.out.println("产品为空, 等待生产者进行生产, 当前消费者: " + Thread.currentThread().getName());
//阻塞消费者者, 并释放当前线程的锁
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从缓冲区获取资源
x = queue.poll();
//日志打印, 业务逻辑尽量放在锁外面 , 这里只是为了顺序打印到控制台
System.out.println("消费者消费产品 " + x + ", 当前消费者: " + Thread.currentThread().getName());
//唤醒其他所有的消费者与生产者
empty.signalAll();//并发, 让其他消费者也参与竞争
full.signalAll();
} finally {
lock.unlock();
}
//防止一瞬间消费完成
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void run() {
consume();
}
}
}
stdout:
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-6
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-4
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-6
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-9
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-4
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-2
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-4
产品为空, 等待生产者进行生产, 当前消费者: ForkJoinPool.commonPool-worker-13
当前生产者生产产品 1, 当前生产者: ForkJoinPool.commonPool-worker-11
消费者消费产品 1, 当前消费者: ForkJoinPool.commonPool-worker-13
4、总结
PV
操作题目的解题思路
-
关系分析。找出题目中描述的各个进程,分析它们之间的同步
-
整理思路。根据各进程的操作流程确定
P
、V
操作的大致顺序。 -
设置信号量。设置需要的信号量,并根据题目条件确定信号量初值。
(互斥信号量初值一般为 1,同步信号量的初始值要看对应资源的初始值是多少)
生产者消费者问题是一个互斥、同步的综合问题。
对于初学者来说最难的是发现题目中隐含的两对同步关系。
有时候是消费者需要等待生产者生产,有时候是生产者要等待消费者消费,这是两个不同的 “一前一后问题”,因此也需要设置两个同步信号量。
易错点:实现互斥和实现同步的两个 P
操作的先后顺序
- 实现互斥的操作要在实现同步的操作之后,否则会产生 “死锁”