目录
- 一、认识阻塞队列
- 1.1 什么是阻塞队列?
- 1.2 生产者消费者模型
- 1.3 标准库中的阻塞队列类
- 二、循环队列实现简单阻塞队列
- 2.1 实现循环队列
- 2.2 阻塞队列实现
一、认识阻塞队列
1.1 什么是阻塞队列?
阻塞队列:从名字可以看出,他也是队列的一种,那么他肯定是一个先进先出(FIFO)的数据结构。与普通队列不同的是,他支持两个附加操作,即阻塞添加和阻塞删除方法。
阻塞队列本质上还是一种队列, 和普通队列一样, 遵循先进先出, 后进后出的规则, 但阻塞队例相比于普通队列的特殊之处在于阻塞队列的阻塞功能, 主要基于多线程使用.
1.如果队列为空, 执行出队列操作, 就会使线程陷入阻塞, 阻塞到另一个线程往队列里添加元素(队列不空)为止.
2.如果队列满了,执行入队列操作, 也会使线程阻塞, 阻塞到另一个线程从队列取走元素位置(队列不满)为止.
1.2 生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
生产者消费者模型能够给程序带来两个非常重要的好处, 一是可以实现实现了发送方和接收方之间的 “解耦” , 二是可以 “削峰填谷” , 保证系统的稳定性, 具体理解如下:
1.3 标准库中的阻塞队列类
Java标准库也提供了阻塞队列的标准类, 常用的有下面几个:
- ArrayBlockingQueue : 基于数组实现界阻塞队列
- LinkedBlockingQueue : 基于链表实现的有界阻塞队列
- PriorityBlockingQueue : 带有优先级(堆)的无界阻塞队列
- BlockingQueue接口 : 上面的类实现了该接口
根据插入和取出两种类型的操作,具体分为下面一些类型:
- 抛出异常是指当队列满时,再次插入会抛出异常(如果队列未满,插入返回值未true);
- 返回布尔是指当队列满时,再次插入会返回false;
- 阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入。
- 超时是指当一个时限过后,才会插入或者取出。
public class Test { public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); //消费者线程 Thread customer = new Thread(() -> { while (true) { try { Integer result = blockingQueue.take(); System.out.println("消费元素: " + result); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); customer.start(); //生产者线程 Thread producer = new Thread(() -> { int count = 0; while (true) { try { blockingQueue.put(count); System.out.println("生产元素: " + count); count++; Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); producer.start(); } }
二、循环队列实现简单阻塞队列
2.1 实现循环队列
//普通的循环队列 class MyBlockingQueue { //存放元素的数数组 private int[] items = new int[1000]; //队头指针 private int head = 0; //队尾指针 private int tail = 0; //记录队列元素的个数 private int size = 0; //入队操作 public void put (int val) { if (size == items.length) { //队列满了 return; } items[tail++] = val; //等价于 tail %= items.length if (tail >= items.length) { tail = 0; } size++; } //出队操作 public Integer take() { int resulet = 0; if (size == 0) { //队列空了 return null; } resulet = items[head++]; //等价于 head %= elem.length if (head >= items.length) { head = 0; } size--; return resulet; } }
2.2 阻塞队列实现
考虑线程安全问题,循环队列中的take和put方法都有写操作,直接加锁即可。
//线程安全的循环队列 class MyBlockingQueue { //存放元素的数数组 private int[] items = new int[1000]; //队头指针 private int head = 0; //队尾指针 private int tail = 0; //记录队列元素的个数 private int size = 0; //入队操作 public void put (int val) { synchronized (this) { if (size == items.length) { //队列满了 return; } items[tail++] = val; //等价于 tail %= items.length if (tail >= items.length) { tail = 0; } size++; } } //出队操作 public Integer take() { int resulet = 0; synchronized (this) { if (size == 0) { //队列空了 return null; } resulet = items[head++]; //等价于 head %= elem.length if (head >= items.length) { head = 0; } size--; return resulet; } } }
实现阻塞效果,主要使用wait和notify实现线程的阻塞等待
入队时, 队列满了需要使用wait方法使线程阻塞, 直到有元素出队队列不满了再使用notify通知线程执行.
出队时, 队列为空也需要使用wait方法使线程阻塞, 直到有新元素入队再使用notify通知线程执行.class MyBlockingQueue { //存放元素的数数组 private int[] items = new int[1000]; //队头指针 private int head = 0; //队尾指针 private int tail = 0; //记录队列元素的个数 private int size = 0; //入队操作 public void put (int val) throws InterruptedException { synchronized (this) { if (size == items.length) { //队列满了,阻塞等待 this.wait(); } items[tail++] = val; //等价于 tail %= items.length if (tail >= items.length) { tail = 0; } size++; //唤醒因队列空造成的阻塞wait this.notify(); } } //出队操作 public Integer take() throws InterruptedException { int resulet = 0; synchronized (this) { if (size == 0) { //队列空了,阻塞等待 this.wait(); } resulet = items[head++]; //等价于 head %= elem.length if (head >= items.length) { head = 0; } size--; //唤醒因队列满造成的阻塞wait this.notify(); return resulet; } } }
思考:当代码中当wait被唤醒的时候,此时的if条件一定就不成立了吗?
为了稳妥起见,最好的办法就是wait唤醒之后再判断一下条件是否满足。
//出队部分 while (size == items.length) { //队列满了,阻塞等待 this.wait(); } //入队部分 while (size == 0) { //队列空了,阻塞等待 this.wait(); }
我们创建两个线程分别是消费者线程customer和生产者线程producer, 生产者生产数字, 消费者消费数字, 为了让执行结果中的阻塞效果明显一些, 我们可以使用sleep方法来控制一下生产者/消费者的生产/消费的频率, 这里我们让开始时生产的速度快一些, 消费的速度慢一些。
class MyBlockingQueue { //存放元素的数数组 private int[] items = new int[1000]; //队头指针 private int head = 0; //队尾指针 private int tail = 0; //记录队列元素的个数 private int size = 0; //入队操作 public void put (int val) throws InterruptedException { synchronized (this) { while (size == items.length) { //队列满了,阻塞等待 this.wait(); } items[tail++] = val; //等价于 tail %= items.length if (tail >= items.length) { tail = 0; } size++; //唤醒因队列空造成的阻塞wait this.notify(); } } //出队操作 public Integer take() throws InterruptedException { int resulet = 0; synchronized (this) { while (size == 0) { //队列空了,阻塞等待 this.wait(); } resulet = items[head++]; //等价于 head %= elem.length while (head >= items.length) { head = 0; } size--; //唤醒因队列满造成的阻塞wait this.notify(); return resulet; } } } public class Test { public static void main(String[] args) { //消费线程 MyBlockingQueue queue = new MyBlockingQueue(); Thread customer = new Thread(() -> { while(true) { try { int result = queue.take(); System.out.println("消费元素: " + result); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); customer.start(); //生产线程 Thread producer = new Thread(() -> { int count = 0; while (true) { try { queue.put(count); System.out.println("生产元素: " + count); count++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); producer.start(); } }