什么是阻塞队列
相比于一般的队列,有两个特点
1.线程安全
2.带有阻塞功能
1)队伍为空时,出队列就会出现阻塞,阻塞到其他线程入队列为止
2)队伍为满时,入队列就会出现阻塞,阻塞到其他线程出队列为止
常用于生产者消费者模型
作用:
1.解耦合
2.削峰填谷
使用阻塞队列
public class Test12 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("qqq");
String elem = queue.take();
System.out.println("elem = "+ elem);
elem = queue.take();
System.out.println("elem = " + elem);
}
}
运行结果
不会结束运行,一直在等待。
使用put和offer一样的都是入队列,但是put是带有阻塞功能,offer没有带阻塞功能(队满了就会返回结果)
take方法用来出队列,也是带有阻塞功能
实现阻塞队列
1)实现普通队列
class MyBlockingQueue{
private String[] elems = null;
private int size = 0;
private int head = 0;
private int tail = 0;
public MyBlockingQueue(int capacity){
elems = new String[capacity];
}
public void put(String elem) throws InterruptedException {
if (size > elems.length){
//阻塞功能
}
elems[tail] = elem;
tail++;
if (tail >= elems.length){
tail = 0;
}
size++;
}
public String take() throws InterruptedException {
if (size == 0){
//阻塞功能
}
String elem = null;
elem = elems[head];
head++;
if (head >= elems.length){
head = 0;
}
size--;
return elem;
}
}
}
2)加上线程安全
class MyBlockingQueue{
private String[] elems = null;
private int size = 0;
private int head = 0;
private int tail = 0;
private Object locker = new Object();
public MyBlockingQueue(int capacity){
elems = new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized(locker){
if (size > elems.length){
}
elems[tail] = elem;
tail++;
if (tail >= elems.length){
tail = 0;
}
size++;
}
}
public String take() throws InterruptedException {
synchronized(locker){
if (size == 0){
}
String elem = null;
elem = elems[head];
head++;
if (head >= elems.length){
head = 0;
}
size--;
return elem;
}
}
}
3)加上阻塞功能
class MyBlockingQueue{
private String[] elems = null;
private int size = 0;
private int head = 0;
private int tail = 0;
private Object locker = new Object();
public MyBlockingQueue(int capacity){
elems = new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized(locker){
while (size > elems.length){
locker.wait();
}
elems[tail] = elem;
tail++;
if (tail >= elems.length){
tail = 0;
}
size++;
locker.notify();
}
}
public String take() throws InterruptedException {
synchronized(locker){
while (size == 0){
locker.wait();
}
String elem = null;
elem = elems[head];
head++;
if (head >= elems.length){
head = 0;
}
size--;
locker.notify();
return elem;
}
}
}
代码解释:
最终代码将if改成了while,因为if只能判定一次,如果出现以下情况就会出bug(线程A,线程B都执行到了put中的wait,因为队列已满而停止运行,线程C出队列唤醒了线程A,线程A继续入队列,入队列后就会notify,导致唤醒了线程B,而此时队列已满,无法进行入队操作,就出现了bug),所以就使用while,wait之前判定一次,唤醒之后再进行一次判定,相当于多做一步确定操作
简单的生产者消费者模型
class MyBlockingQueue{
private String[] elems = null;
private int size = 0;
private int head = 0;
private int tail = 0;
private Object locker = new Object();
public MyBlockingQueue(int capacity){
elems = new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized(locker){
while (size > elems.length){
locker.wait();
}
elems[tail] = elem;
tail++;
if (tail >= elems.length){
tail = 0;
}
size++;
locker.notify();
}
}
public String take() throws InterruptedException {
synchronized(locker){
while (size == 0){
locker.wait();
}
String elem = null;
elem = elems[head];
head++;
if (head >= elems.length){
head = 0;
}
size--;
locker.notify();
return elem;
}
}
}
public class Test11 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(100);
Thread t1 = new Thread(()->{
int n = 1;
while(true){
try {
myBlockingQueue.put(n + "");
System.out.println("生产元素 "+n);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(()->{
while(true){
try {
String n = myBlockingQueue.take();
System.out.println("消费元素 " + n);
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}