同步/条件队列
先上代码
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
private static final int CAPACITY = 5;
private final Queue<Integer> queue = new LinkedList<>();
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition bufferNotFull = lock.newCondition();
private final Condition bufferNotEmpty = lock.newCondition();
public void produce() throws InterruptedException {
int value = 0;
while (true) {
lock.lock();
try {
while (queue.size() == CAPACITY) {
System.out.println("Buffer is full, waiting...");
bufferNotFull.await(); // 等待,直到缓冲区不满
}
queue.add(value);
System.out.println("Produced: " + value);
value++;
bufferNotEmpty.signal(); // 通知消费者缓冲区有数据了
} finally {
lock.unlock();
}
Thread.sleep(1000); // 模拟生产时间
}
}
public void consume() throws InterruptedException {
while (true) {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Buffer is empty, waiting...");
bufferNotEmpty.await(); // 等待,直到缓冲区不为空
}
int value = queue.poll();
System.out.println("Consumed: " + value);
bufferNotFull.signal(); // 通知生产者缓冲区不满了
} finally {
lock.unlock();
}
Thread.sleep(1000); // 模拟消费时间
}
}
public static void main(String[] args) {
ProducerConsumerExample example = new ProducerConsumerExample();
Thread producerThread = new Thread(() -> {
try {
example.produce();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumerThread = new Thread(() -> {
try {
example.consume();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
}
}
生产者可以调用
bufferNotFull.await(); // 等待,直到缓冲区不满
将自己放入条件队列
也可以调用
bufferNotEmpty.signal(); // 通知消费者缓冲区有数据了
将消费者放入同步阻塞队列
消费者可以调用
bufferNotEmpty.await(); // 等待,直到缓冲区不为空
将自己放入条件队列
也可以调用
bufferNotFull.signal(); // 通知生产者缓冲区不满了
将生产者放入同步阻塞队列