文章目录
- 多线程案例二
- 二、阻塞式队列
大家好,我是晓星航。今天为大家带来的是 多线程案例二 相关的讲解!😀
多线程案例二
二、阻塞式队列
阻塞队列是什么
阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
1)阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力. (削峰填谷)
比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.
2)阻塞队列也能使生产者和消费者之间 解耦.
解耦:降低耦合的过程。
耦合:两个东西关联程度高不高,高叫做高耦合,低叫做低耦合。
比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺 子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人 也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超 市买的).
使用生产者消费者模型前:
使用了生产者消费者模型之后:
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
生产者消费者模型
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生产元素: " + num);
blockingQueue.put(num);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
阻塞队列用法举例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
//阻塞队列的使用
public class ThreadDemo21 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.put("hello");
String res = blockingQueue.take();
System.out.println(res);
res = blockingQueue.take();
System.out.println(res);
}
}
这里我们的res取出了hello之后,我们的阻塞队列就变为空了,因此我们第一次打印res就输出结果hello,而第二次打印我们程序就回因为阻塞队列无值而进入wait阻塞状态。
阻塞队列实现
- 通过 “循环队列” 的方式来实现.
- 使用 synchronized 进行加锁控制.
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一 定队列就不满了, 因为同时可能是唤醒了多个线程).
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
public class BlockingQueue {
private int[] items = new int[1000];
private volatile int size = 0;
private int head = 0;
private int tail = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
// 此处最好使用 while.
// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
// 就只能继续等待
while (size == items.length) {
wait();
}
items[tail] = value;
tail = (tail + 1) % items.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = items[head];
head = (head + 1) % items.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
// 测试代码
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new BlockingQueue();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
blockingQueue.put(random.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
}
自己实现阻塞队列:ThreadDemo23
//自己写的阻塞队列
//此处不考虑泛型
class MyBlockingQueue {
private final int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
//入队列
public void put (int value) throws InterruptedException {
synchronized (this) {
//这里的 while 是为了循环判断这里的队列到底是否已满 如果不满足就一直阻塞
while (size == items.length) {
//数组已满 ,此时要产生阻塞
//return;
this.wait();
}
items[tail] = value;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
//这个notify 唤醒 take 中的wait
this.notify();
}
}
//出队列 - 先进先出
public Integer take() throws InterruptedException {
int result = 0;
synchronized (this) {
//这里的 while 是为了循环判断这里的队列到底是否为空 如果不满足就一直阻塞
while (size == 0) {
//队列为空 无法出元素
//return null;
this.wait();
}
result = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
//唤醒 put 中的 wait
this.notify();
}
return result;
}
}
public class ThreadDemo23 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue = new MyBlockingQueue();
queue.put(1);
queue.put(2);
queue.put(3);
queue.put(4);
queue.put(5);
int result = 0;
result = queue.take();
System.out.println("result: " + result);
result = queue.take();
System.out.println("result: " + result);
result = queue.take();
System.out.println("result: " + result);
result = queue.take();
System.out.println("result: " + result);
result = queue.take();
System.out.println("result: " + result);
}
}
运行结果:
这里我们 put 和 take 中的wait和notify操作是相互唤醒的。
那么这两个线程中的 wait 是否可能同时触发???(如果同时触发了,显然就不能正确相互唤醒了)
答:不会,针对同一个队列,不可能即使满又是空。
感谢各位读者的阅读,本文章有任何错误都可以在评论区发表你们的意见,我会对文章进行改正的。如果本文章对你有帮助请动一动你们敏捷的小手点一点赞,你的每一次鼓励都是作者创作的动力哦!😘