文章目录
- 1. 什么是堵塞队列?
- 2. 堵塞队列的方法
- 3. 生产者消费者模型
- 4. 自己实现堵塞队列
1. 什么是堵塞队列?
堵塞队列也是队列,故遵循先进先出的原则。但堵塞队列是一种线程安全的数据结构,可以避免线程安全问题,当队列为空时,继续出队列会发生堵塞,直至其他线程有元素进队列。当队列满时,继续入队列会堵塞,直至其他线程有元素出队列。
生产者消费者模型就是最经典的堵塞队列模型之一。
2. 堵塞队列的方法
Java标准库里含有堵塞队列,我们使用时可以直接使用标准库即可。
- BlockingQueue是个接口,真正实现的类是:LinkedBlockingQueue。
- put方法入队列,take方法出队列,具有堵塞性。
- peek,offer,poll等方法也可以使用,但是不具有堵塞性。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
queue.put();
queue.take();
3. 生产者消费者模型
这是一个常见的并发编程模型,用于协调生产者和消费者之间的工作,在这个模型中,一个线程负责生产元素,一个线程负责消费元素。
它也是一个堵塞队列 ,所以具有堵塞队列的特性。
public class Test {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
//t1线程负责生产元素
Thread t1 = new Thread(() -> {
try{
Random random = new Random();
while (true){
int i = random.nextInt();
System.out.println("生产元素: " + i);
queue.put(i);
Thread.sleep(1000);
}
}catch (InterruptedException e){
e.printStackTrace();
}
});
//t2负责消费元素
Thread t2 = new Thread(() -> {
try {
while(true){
int i = queue.take();
System.out.println("消费元素: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
}
}
运行结果:
4. 自己实现堵塞队列
自己实现堵塞队列需要满足:
- 通过循环队列来实现;
- 使用synchronized实现加锁,进行同步;
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程);
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait) ;
class BlockingQueue{
//定义一个数组
private int[] arr;
//数组中元素个数
private int size = 0;
//记录数组头的位置
private int read = 0;
//记录尾的位置
private int tail = 0;
//锁
Object lock = new Object();
//构造方法,确定数组大小
public BlockingQueue(int i){
arr = new int[i];
}
//入队列
public void put(int value){
try{
//加锁,保证线程安全
synchronized (lock){
//使用while,不要使用if,否则notifyAll时,所有等待线程都被唤醒,造成线程安全
while(arr.length == size){
lock.wait();
}
arr[tail] = value;
//循环队列
tail = (tail + 1) % arr.length;
//添加一个元素,size加一次
size++;
//唤醒所有线程
lock.notifyAll();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
public int take() {
//返回值,不能写在try里面
int value = 0;
try {
//上锁
synchronized (lock) {
//队列为0,等待,使用while
while (size == 0) {
lock.wait();
}
//赋返回值
value = arr[read];
//循环
read = (read + 1) % arr.length;
//出队列一个,减一个
size--;
//唤醒
lock.notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回值
return value;
}
}
public class Test4 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new BlockingQueue(10);
Thread t1 = new Thread(() -> {
try{
Random random = new Random();
while (true){
int value = random.nextInt(100);
System.out.println("生产元素: " + value);
queue.put(value);
Thread.sleep(100);
}
}catch (InterruptedException e){
e.printStackTrace();
}
});
//t2负责消费元素
Thread t2 = new Thread(() -> {
while(true){
int value = queue.take();
System.out.println("消费元素: " + value);
}
});
t1.start();
Thread.sleep(2000);
t2.start();
}
}
1 因为t2 未启动,所以只能生产元素,当生产10个元素后,队列满了,进入堵塞,等待被唤醒;
2 因为t1生产时,每次都会休眠,线程执行非常迅速,所以队列一直出元素,直到队列为空,进入堵塞,等待被唤醒;
3生产一个元素消费一个元素。