目录
阻塞队列
阻塞队列的使用
生产者消费者模型
模型的两个好处
1. 降低耦合
2. 削峰填谷
简单实现阻塞队列
阻塞队列
阻塞队列是在一般的队列上升级而来的。
对于队列为空时,如果还想取队列中的元素,此时阻塞队列就会进行阻塞。
对于队列为满时,如果还想往队列中放元素,此时阻塞队列就会进行阻塞。
阻塞队列的使用
阻塞队列的具体使用由这个接口 BlockingQueue<>实现。
可以看到,该接口继承了队列,所以它有队列的所有方法。不过在使用阻塞队列时,我们一般只使用以下两种方法
方法 | 说明 |
void put(value) | 该方法是往队列里面添加元素,满了就阻塞 |
Object take() | 该方法是取出队列的元素,空了就阻塞 |
其他的方法没有阻塞功能。
当我们实例化的时候可以看到,有以下三种实现方式。
简单使用一下阻塞队列。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadDemo21 {
public static void main(String[] args) throws InterruptedException {
// 这个队列的空间大小设置为 3
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
// 因为只有一个主线程
// 当往队列里添加第四个元素时, 这是队列就会进行阻塞
blockingQueue.put(4);
// 以下代码执行不到了
int ret = blockingQueue.take();
System.out.println(ret);
ret = blockingQueue.take();
System.out.println(ret);
ret = blockingQueue.take();
System.out.println(ret);
ret = blockingQueue.take();
System.out.println(ret);
}
}
生产者消费者模型
对于上图的模型,就是生产者消费者模型。
阻塞队列通常被应用到生产者消费者模型当中。阻塞队列充当缓冲区。
// 使用阻塞队列实现生产者消费者模型
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadDemo22 {
public static void main(String[] args) {
// 生产者产生的数字都放到阻塞队列中
// 消费者从阻塞队列中拿到数字
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(1000);
// t1线程是生产者
Thread t1 = new Thread(() -> {
int i = 1;
while (true) {
try {
blockingQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者: " + i);
i++;
// 生产者每隔100ms产生一个数字
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// t2线程是消费者
Thread t2 = new Thread(() -> {
while (true) {
try {
int ret = blockingQueue.take();
System.out.println("消费者: " + ret);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}
模型的两个好处
1. 降低耦合
为引入该模型前,耦合度高。
引入该模型后,耦合性降低。
2. 削峰填谷
前段时间鹅鸭杀游戏突然爆火,从日均几万人到四五十万人同时在线。此时大量用户数据就会到这个缓冲区当中,不至于一下子全部发送到服务器当中,导致服务器崩溃。这里的缓冲区就起到了削峰的作用;当用户数量下去之后,服务器就从缓冲区中继续读取数据,不至于在这个时间闲着没事干。
简单实现阻塞队列
具体代码的解释都卸载注释中了。
// 实现一个阻塞队列
// 不带泛型, 直接使用Integer类型
// 只实现 带有阻塞功能的 put 和 take 方法
class MyBlockQueue {
private Integer[] arr = new Integer[100];
private int head = 0;// 指向对头
private int tail = 0;// 指向队尾
private int size = 0;// 计数
public void put(int value) throws InterruptedException {
// 加锁只是针对当前的对象
// 如果实例化了其他对象, 并不会相互影响
synchronized(this) {
// 这里while相当于多次if
// 目的是为了判断唤醒之后是否数组是否还是满的, 满了继续阻塞
// 因为在多线程下, 有可能顺序会发生变化
while (size == arr.length) {
this.wait();
}
// 没有满, 在tail指向的位置添加元素, 添加完之后tail后移
arr[tail] = value;
tail++;
// tail如果到了末尾, 它就要从头开始
if (tail >= arr.length) {
tail = 0;
}
//计数器++
size++;
// 此时数组中有元素了, 可以唤醒在take方法中因为数组为空而阻塞的原因了
this.notify();
}
}
// 由于涉及到多线程, 所以要考虑线程安全问题
// head tail size都是会改变的变量, 所以考虑synchronized来保持其原子性
public Integer take() throws InterruptedException {
synchronized(this) {
// 如果数组为空, 就要阻塞等待
// 使用while的理由同上
while (size == 0) {
this.wait();
}
// 有元素, 开始从head指向的弹出
Integer ret = arr[head];
head++;
if (head >= arr.length) {
head = 0;
}
size--;
// 此时数组中有空的位置了, 可以唤醒在put方法中因为数组满而阻塞的原因了
this.notify();
return ret;
}
}
}
public class ThreadDemo24 {
public static void main(String[] args) {
// 使用生产者消费者模型测试一下实现的阻塞队列
// t1 线程为生产者
// t2 线程为消费者
MyBlockQueue myBlockQueue = new MyBlockQueue();
Thread t1 = new Thread(()-> {
int i = 1;
while (true) {
try {
myBlockQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者" + i);
i++;
// 让生产者每隔 100ms 生产出一个数字
// 如果消费者没有时间限制, 这样就会出现队列为空而阻塞的情况
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(()-> {
while (true) {
try {
int ret = myBlockQueue.take();
System.out.println("消费者" + ret);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}
有什么错误评论区指出。希望可以帮到你。