阻塞队列(BlockingQueue)
什么情况下我们会使用阻塞队列?
多线程并发处理,线程池!
使用队列
-
添加
-
移除
BlockingQueue四组API
方式 | 有返回值,抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
添加 | add() | offer() | put() | offer( , , ) |
移除 | remove() | poll() | take() | poll( , ) |
查看队列首元素 | element() | peek() | - | - |
有返回值,抛出异常
添加 add() 、移除 remove() 、查看队列首元素 element()
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
test();
}
//有返回值,抛出异常
public static void test(){
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// add()方法返回boolean值
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// add添加元素超过队列的长度会抛出异常java.lang.IllegalStateException: Queue full
// System.out.println(blockingQueue.add("d"));
System.out.println("=============");
// remove()返回本次移除的元素
System.out.println(blockingQueue.remove());
// 获得队首元素
System.out.println("队首:" + blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// 队列中没有元素仍继续移除元素会抛出异常java.util.NoSuchElementException
// System.out.println(blockingQueue.remove());
}
}
add添加元素超过队列的长度会抛出异常java.lang.IllegalStateException: Queue full
队列中没有元素仍继续移除元素会抛出异常java.util.NoSuchElementException
有返回值,不抛出异常
添加 offer() 、移除 poll() 、查看队列首元素 peek()
public class Test {
public static void main(String[] args) {
test();
}
//有返回值,不抛出异常
public static void test(){
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// offer返回boolean值
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// offer添加元素超过队列的长度会返回false
System.out.println(blockingQueue.offer("d"));
System.out.println("===========");
// poll()返回本次移除的元素
System.out.println(blockingQueue.poll());
// 获得队首元素
System.out.println("队首:" + blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 队列中没有元素仍继续移除元素会打印出null
System.out.println(blockingQueue.poll());
}
}
阻塞等待
添加 put() 、移除 take()
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
test();
}
//阻塞等待,一直等
public static void test(){
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
try {
// put没有返回值
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//队列没有位置了,一直阻塞
// blockingQueue.put("d");
// take()返回本次移除的元素
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//队列没有元素了,一直阻塞
// System.out.println(blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
添加元素时,若队列没有位置了,一直阻塞
移除元素时,若队列没有元素了,一直阻塞
超时等待
添加 offer(E e, long timeout, TimeUnit unit) 、移除poll(long timeout, TimeUnit unit)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
test();
}
//超时等待
public static void test() {
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
try {
// offer返回boolean值
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2, TimeUnit.SECONDS));
//等待超过2秒,offer添加元素超过队列的长度会返回false;并且等待指定时间后推出,向下执行
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));
System.out.println("================");
// poll()返回本次移除的元素
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
//等待超过2秒,队列中没有元素仍继续移除元素会打印出null,等待指定之间后退出。
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
同步队列(SynchronousQueue)
进去一个元素,必须等待取出这个元素后,才能放下一个元素。put()、take()
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<Object> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " put a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + " put b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + " put c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}