双缓冲的本质是 通过空间换时间,通过冗余的缓冲区解决生产者和消费者的速度差异问题,同时提升系统的并发性和稳定性。
双缓冲的核心优势
优势 | 具体表现 |
---|---|
解耦生产与消费 | 生产者和消费者可以独立工作,无需直接同步。 |
提高并发性 | 生产者和消费者可以同时操作不同的缓冲区,减少等待时间。 |
避免数据竞争 | 通过锁和条件变量确保读写操作的原子性。 |
应对突发流量 | 缓冲区作为临时存储,吸收流量峰值,避免系统过载。 |
双缓冲的潜在问题
内存占用翻倍:需要维护两个缓冲区,内存消耗增加。
切换开销:缓冲区切换时需要加锁和同步,可能引入短暂延迟。
数据一致性:如果切换时机不当,可能导致数据丢失或重复处理。
public class DoubleBufferSystem {
// 主缓冲区和备缓冲区(环形数组)
private Buffer primaryBuffer;
private Buffer backupBuffer;
// 锁和条件变量
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
// 异步处理线程池
private final ExecutorService processor = Executors.newSingleThreadExecutor();
// 状态标志
private volatile boolean isPrimaryActive = true;
public DoubleBufferSystem(int bufferSize) {
this.primaryBuffer = new Buffer(bufferSize);
this.backupBuffer = new Buffer(bufferSize);
}
/**
* 将数据写入缓冲区,生产者调用此方法
*
* @param data 要写入缓冲区的数据
* @throws InterruptedException 如果线程被中断
*/
public void produce(int data) throws InterruptedException {
// 获取锁
lock.lock();
try {
// 当前活跃缓冲区写满时触发切换
// 假如换成if,线程被唤醒后缓冲区仍然是满的,线程会直接往下走写入而不是等待,导致数据丢失或异常
while (isPrimaryActive && primaryBuffer.isFull()) {
// 当前线程会被挂起(进入等待队列),不会占用 CPU 资源
notFull.await();
}
// 写入当前活跃缓冲区
Buffer activeBuffer = isPrimaryActive ? primaryBuffer : backupBuffer;
activeBuffer.add(data);
// 通知所有等待缓冲区非空的消费者
notEmpty.signalAll();
} finally {
// 释放锁
lock.unlock();
}
}
// 异步处理数据,一个无限循环的任务到线程池中执行,直接调用
public void startProcessing() {
processor.submit(() -> {
while (true) {
lock.lock();
try {
// 等待数据就绪
// 如果当前活跃缓冲区不为空,则当前线程会被挂起,直到有数据被消费掉(即缓冲区变为空)
while (isPrimaryActive && !primaryBuffer.isEmpty()) {
notEmpty.await();
}
// 当前活跃缓冲区为空且处于活动状态时,切换缓冲区
if (isPrimaryActive) {
isPrimaryActive = false;
swapBuffers();
// 唤醒所有因缓冲区满而阻塞的生产者线程,可以开始写入了
notFull.signalAll();
}
// 处理备份缓冲区中的数据
processBuffer(backupBuffer);
} finally {
lock.unlock();
}
}
});
}
// 缓冲区切换核心逻辑
private void swapBuffers() {
Buffer temp = primaryBuffer;
primaryBuffer = backupBuffer;
backupBuffer = temp;
backupBuffer.clear(); // 清空原备缓冲区
}
// 模拟磁盘写入(异步)
private void processBuffer(Buffer buffer) {
while (!buffer.isEmpty()) {
int data = buffer.poll();
// 批量写入提升IO效率
System.out.println("Processing data: " + data);
}
}
// 环形缓冲区实现
private static class Buffer {
private final int[] data;
private int head = 0;
private int tail = 0;
private final int capacity;
Buffer(int capacity) {
this.capacity = capacity + 1; // 环形缓冲区需要额外一个位置区分满/空
this.data = new int[capacity + 1];
}
public boolean isFull() {
// [1, 2, 3, _]
// head = 0, tail = 3
// isFull() => (3 + 1) % 4 == 0 => true
// 生产者线程尝试写入第四个元素时,会发现 primaryBuffer.isFull() 为 true
return (tail + 1) % capacity == head;
}
public boolean isEmpty() {
return head == tail;
}
public void add(int value) {
data[tail] = value;
tail = (tail + 1) % capacity;
}
public int poll() {
int value = data[head];
head = (head + 1) % capacity;
return value;
}
public void clear() {
head = 0;
tail = 0;
}
}
}
优化点:
减少锁持有时间:消费者处理数据时释放锁,避免阻塞生产者。
// 处理数据时无需持有锁
processBatch(batchData);
添加优雅终止机制:通过标志位控制消费者线程退出。
public class DoubleBufferSystem {
private final Buffer primaryBuffer;
private final Buffer backupBuffer;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final ExecutorService processor = Executors.newSingleThreadExecutor();
private volatile boolean isPrimaryActive = true;
private volatile boolean isRunning = true;
public DoubleBufferSystem(int bufferSize) {
this.primaryBuffer = new Buffer(bufferSize);
this.backupBuffer = new Buffer(bufferSize);
}
public void produce(int data) throws InterruptedException {
lock.lock();
try {
while (isPrimaryActive ? primaryBuffer.isFull() : backupBuffer.isFull()) {
notFull.await();
}
Buffer activeBuffer = isPrimaryActive ? primaryBuffer : backupBuffer;
activeBuffer.add(data);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public void startProcessing() {
processor.submit(() -> {
while (isRunning) {
List<Integer> batchData = new ArrayList<>();
lock.lock();
try {
while (isPrimaryActive && primaryBuffer.isEmpty()) {
notEmpty.await();
}
// 批量取出数据
while (!primaryBuffer.isEmpty()) {
batchData.add(primaryBuffer.poll());
}
// 切换缓冲区
isPrimaryActive = !isPrimaryActive;
swapBuffers();
notFull.signalAll();
} finally {
lock.unlock();
}
// 异步处理数据 不用加锁
processBatch(batchData);
}
});
}
private void processBatch(List<Integer> batchData) {
for (int data : batchData) {
System.out.println("Processing data: " + data);
}
}
private void swapBuffers() {
Buffer temp = primaryBuffer;
primaryBuffer.clear();
primaryBuffer = backupBuffer;
backupBuffer = temp;
}
public void shutdown() {
isRunning = false;
processor.shutdown();
}
}