需求:
创建消息队列时需要指定队列的容量上限,队列中没有消息时,消费者从队列中take元素会阻塞;队列中的消息数量达到容量上限时,生产者往队列中put元素会阻塞。要保证线程安全。
组成:
(1)消息(Message):
class Message{
//消息id
private int id;
//消息携带的真实数据
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public java.lang.String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
(2)消息队列(MessageQueue):
//消息队列
class MessageQueue{
//存储消息的容器,双向队列
LinkedList<Message> messages = new LinkedList<>();
//存储消息容器的容量
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//消费者线程获取消息的方法
public Message take() {
synchronized (messages) {
//先判断容器中是否有消息
while (messages.isEmpty()) {
try {
System.out.println("容器中没有消息,消费者线程陷入阻塞...");
//容器中没有消息,线程进入Wait Set等待消息和被唤醒
messages.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = messages.removeFirst();
System.out.println("消费者线程拿走了一条消息...");
//拿走后唤醒因容器消息到达上线而阻塞的生产者线程
messages.notifyAll();
return message;
}
}
//生产者线程产生消息的方法
public void produce(Message message){
synchronized (messages){
//判断容器的容量是否达到上限
while (messages.size() == capacity){
try {
System.out.println("容器中的消息达到容量上限,生产者线程陷入阻塞...");
//容器中的消息达到容量上限,则生产者线程进入Wait Set,待消费者线程拿走结果后唤醒生产者线程
messages.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//没有达到上限则往容器中添加结果
messages.addLast(message);
System.out.println("生产者线程往容器中添加了一条消息...");
//唤醒因容器中没有结果而陷入阻塞的消费者线程
messages.notifyAll();
}
}
}
测试:
(1)设置队列容量上限为3,4个生产者同时生产一条消息放入队列,2s后,1个消费者来消费消息,看生产者线程在消费者take消息前是否会阻塞。
public class ProducerConsumer {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(3);
AtomicInteger atomicInteger = new AtomicInteger(1);
//四个生产者线程生产四条消息
for (int i = 0; i <= 3; i++) {
new Thread(()->{
//调用消息队列中的produce方法生产消息
messageQueue.produce(new Message(atomicInteger.getAndIncrement(),UUID.randomUUID().toString() ));
},"生产者-"+i).start();
}
try {
//模拟过2s消费者来获取消息
Thread.sleep(2000);
new Thread(() -> {
Message msg = messageQueue.take();
System.out.println("消费者拿到了消息id为:" + msg.getId() +"的消息");
}).start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果:
(2)先启动消费者线程,过2s后再启动生产者线程,看消费者线程是否会在生产者put消息前一直阻塞。
public class ProducerConsumer {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(3);
AtomicInteger atomicInteger = new AtomicInteger(1);
new Thread(() -> {
System.out.println("消费者线程尝试从队列中获取消息");
long start = System.currentTimeMillis();
Message msg = messageQueue.take();
long end = System.currentTimeMillis();
System.out.println("经过:" + (end - start) +"ms后,消费者线程拿到了消息"+msg.getId());
},"Consumer Thread").start();
try {
//2s后,生产者生产消息
Thread.sleep(2000);
new Thread(() -> {
System.out.println("生产者线程将消息放入队列中");
messageQueue.produce(new Message(1,"消息" + atomicInteger.getAndIncrement()));
},"Producer Thread").start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果: