C++笔记之不同buffer数量下的生产者-消费者机制
文章目录
- C++笔记之不同buffer数量下的生产者-消费者机制
- 0.在不同的缓冲区数量下,生产者-消费者机制的实现方式和行为的区别
- 1.最简单的生产者-消费者实现:抄自 https://mp.weixin.qq.com/s/G1lHNcbYU1lUlfugXnNhZg
- 2.1个生产者,1个消费者,操作1个buffer,使用环形队列
- 3.3个生产者,3个消费者,操作1个buffer
- 4.3个生产者,3个消费者,操作2个buffer,双buffer中存在生产者和消费者交替操作同一个buffer逻辑
- 5.1个生产者,1个消费者,操作3个buffer
- 6.n个生产者,n个消费者,操作n个buffer
- 7.在生产者-消费者中使用信号量
0.在不同的缓冲区数量下,生产者-消费者机制的实现方式和行为的区别
-
单个缓冲区:
- 在单个缓冲区的情况下,生产者和消费者共享一个有限大小的缓冲区,生产者将数据放入缓冲区,而消费者从中取出数据。
- 如果缓冲区已满,生产者必须等待,直到有空间可用。如果缓冲区为空,消费者必须等待,直到有数据可用。
- 这种情况下,通常需要使用互斥锁来保护共享缓冲区,以确保多个线程或进程不会同时访问缓冲区,从而避免竞态条件。
-
多个缓冲区:
- 在多个缓冲区的情况下,通常有多个生产者和多个消费者,每个生产者-消费者对共享一个缓冲区。这些缓冲区可以是独立的,也可以按照某种策略连接在一起。
- 这种情况下,不同的生产者可以同时往不同的缓冲区放置数据,不同的消费者可以同时从不同的缓冲区取出数据,从而提高了并发性能。
- 但是,管理多个缓冲区可能需要更复杂的同步和管理机制,以确保生产者和消费者之间的正确协作。
总的来说,单个缓冲区和多个缓冲区的主要区别在于资源的共享方式和并发性能。单个缓冲区通常更简单,但并发性能可能较低,因为所有的生产者和消费者都要竞争同一个缓冲区。多个缓冲区可以提高并发性能,但需要更复杂的管理和同步机制来维护多个缓冲区的状态。选择哪种方式取决于具体的应用需求和性能要求。
下面将比较这三种情况:一个缓冲区、两个缓冲区和三个缓冲区下的生产者-消费者机制的实现。
-
一个缓冲区:
- 在这种情况下,你只需要一个共享的缓冲区,可以使用一个队列(如std::queue)来表示。
- 生产者将数据放入队列的尾部,消费者从队列的头部取出数据。
- 你需要使用互斥锁来保护队列,以确保生产者和消费者不会同时访问它。
- 生产者在队列满时会阻塞,消费者在队列为空时会阻塞,可以使用条件变量来实现这种等待。
-
两个缓冲区:
- 在这种情况下,你可以使用两个独立的缓冲区,每个缓冲区都有自己的队列。
- 生产者可以交替将数据放入不同的队列,消费者也可以交替从不同的队列取出数据。
- 这种方式可以提高并发性能,因为生产者和消费者可以并行操作不同的缓冲区。
- 你需要使用互斥锁和条件变量来保护每个队列。
-
三个缓冲区:
- 在这种情况下,你可以类似地使用三个独立的缓冲区,每个缓冲区有自己的队列。
- 生产者和消费者之间的协作方式和两个缓冲区的情况类似,只是有更多的缓冲区可供使用。
- 这可以进一步提高并发性能,但需要更复杂的管理和同步机制。
1.最简单的生产者-消费者实现:抄自 https://mp.weixin.qq.com/s/G1lHNcbYU1lUlfugXnNhZg
在 C++ 中可以使用 std::condition_variable 来实现生产者和消费者模式:生产者在缓冲区未满时不断添加数据,并唤醒消费者进行数据读取;消费者在缓存区为空时阻塞等待生产者的唤醒,并在读取数据后唤醒等待的生产者可以继续添加数据。
运行
代码
#include "condition_variable"
#include "iostream"
#include "queue"
#include "thread"
#include <mutex>
using namespace std;
class ProducerAndConsumerDemo {
public:
void producerNumber(); // 生产数据
void consumerNumber(); // 消费数据
private:
const int dataSize = 1000; // 总数据量
queue<int> buffer; // 数据缓存区
const int bufferSize = 10; // 缓存区大小
condition_variable bufferNotEmpty; // 信号量--缓存区有数据了
condition_variable bufferNotFull; // 信号量--缓存区不满了
mutex m_mutex; // 互斥量
};
void ProducerAndConsumerDemo::producerNumber() {
for (int i = 0; i < dataSize; ++i) {
{
unique_lock<mutex> locker(m_mutex);
bufferNotFull.wait(locker, [&]() { return buffer.size() < bufferSize; }); // 缓存区满了则阻塞
buffer.push(i);
cout << "生产者---生产了数字:" << i << ",当前 bufferSize:" << buffer.size() << endl;
} // 解锁互斥量
bufferNotEmpty.notify_one();
this_thread::sleep_for(chrono::milliseconds(1000)); // 模拟生产耗时
}
}
void ProducerAndConsumerDemo::consumerNumber() {
while (true) {
{
unique_lock<mutex> locker(m_mutex);
bufferNotEmpty.wait(locker, [&]() { return buffer.size() > 0; }); // 缓冲区为空则阻塞
int i = buffer.front();
buffer.pop();
cout << "消费者---消费了数字:" << i << ",当前 bufferSize:" << buffer.size() << endl;
} // 解锁互斥量
bufferNotFull.notify_one();
this_thread::sleep_for(chrono::milliseconds(2000)); // 模拟消费耗时
}
}
int main() {
ProducerAndConsumerDemo pcDemo;
thread consumerThread(&ProducerAndConsumerDemo::producerNumber, &pcDemo);
thread producerThread(&ProducerAndConsumerDemo::consumerNumber, &pcDemo);
producerThread.join();
consumerThread.join();
system("pause");
return 0;
}
2.1个生产者,1个消费者,操作1个buffer,使用环形队列
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
const int bufferSize = 5; // 缓冲区大小
class CircularQueue {
public:
CircularQueue() : buffer(bufferSize), front(0), rear(0), count(0) {}
// 生产者线程使用的enqueue函数,将数据添加到队列
void enqueue(int item) {
std::unique_lock<std::mutex> lock(mutex);
// 检查队列是否已满
if (count < bufferSize) {
buffer[rear] = item;
rear = (rear + 1) % bufferSize;
count++;
std::cout << "Produced: " << item << std::endl;
// 通知等待中的消费者线程有新数据可用
cv.notify_all();
}
}
// 消费者线程使用的dequeue函数,从队列中取出数据
int dequeue() {
std::unique_lock<std::mutex> lock(mutex);
// 如果队列为空,等待生产者生产数据
while (count == 0) {
cv.wait(lock);
}
// 从队列中取出数据
int item = buffer[front];
front = (front + 1) % bufferSize;
count--;
std::cout << "Consumed: " << item << std::endl;
return item;
}
private:
std::vector<int> buffer; // 缓冲区,用于存储数据
int front; // 队列前部索引
int rear; // 队列后部索引
int count; // 当前队列中的元素数量
std::mutex mutex; // 用于线程同步的互斥锁
std::condition_variable cv; // 条件变量,用于线程等待和通知
};
// 生产者线程函数,负责向队列中添加数据
void producer(CircularQueue &queue) {
for (int i = 1; i <= 10; ++i) {
queue.enqueue(i);
// 模拟生产耗时
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
// 消费者线程函数,负责从队列中取出数据
void consumer(CircularQueue &queue) {
for (int i = 0; i < 10; ++i) {
int item = queue.dequeue();
// 模拟消费耗时
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
int main() {
CircularQueue queue;
// 创建生产者线程和消费者线程
std::thread producerThread(producer, std::ref(queue));
std::thread consumerThread(consumer, std::ref(queue));
// 等待线程结束
producerThread.join();
consumerThread.join();
return 0;
}
3.3个生产者,3个消费者,操作1个buffer
这个示例中,有3个生产者线程和3个消费者线程,它们并行地生产和消费元素,使用互斥锁保护共享的队列,并使用条件变量来通知生产者和消费者缓冲区的状态。生产者线程生成随机整数并将其放入缓冲区,而消费者线程从缓冲区中取出元素并将其打印出来。每个生产者和消费者线程各执行5次操作。
运行
代码
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
const int buffer_size = 3; // 缓冲区大小
std::queue<int> buffer; // 环形队列
std::mutex mtx; // 互斥锁,用于保护缓冲区的访问
std::condition_variable not_full; // 条件变量,表示缓冲区不满
std::condition_variable not_empty; // 条件变量,表示缓冲区不空
void producer(int id) {
for (int i = 0; i < 5; ++i) {
std::unique_lock<std::mutex> lock(mtx);
while (buffer.size() >= buffer_size) {
not_full.wait(lock); // 如果缓冲区已满,等待直到不满
}
int item = rand() % 100;
buffer.push(item);
std::cout << "Producer " << id << " produced: " << item << std::endl;
not_empty.notify_all(); // 通知消费者缓冲区不空
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产过程
}
}
void consumer(int id) {
for (int i = 0; i < 5; ++i) {
std::unique_lock<std::mutex> lock(mtx);
while (buffer.empty()) {
not_empty.wait(lock); // 如果缓冲区为空,等待直到不空
}
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed: " << item << std::endl;
not_full.notify_all(); // 通知生产者缓冲区不满
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟消费过程
}
}
int main() {
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; ++i) {
producers.emplace_back(producer, i);
consumers.emplace_back(consumer, i);
}
for (auto& producer_thread : producers) {
producer_thread.join();
}
for (auto& consumer_thread : consumers) {
consumer_thread.join();
}
return 0;
}
4.3个生产者,3个消费者,操作2个buffer,双buffer中存在生产者和消费者交替操作同一个buffer逻辑
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
const int BUFFER_SIZE = 5; // 缓冲区大小
std::mutex mtx; // 互斥锁,用于保护共享资源
std::condition_variable producer_cv, consumer_cv; // 条件变量,用于线程同步
std::vector<int> buffer1, buffer2; // 两个缓冲区,用于生产者和消费者之间的数据交换
int current_buffer = 1; // 标志,表示当前使用哪个缓冲区
// 生产者函数
void producer() {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产时间
std::unique_lock<std::mutex> lock(mtx); // 获取互斥锁,保护共享资源
if ((current_buffer == 1 && buffer1.size() >= BUFFER_SIZE) ||
(current_buffer == 2 && buffer2.size() >= BUFFER_SIZE)) {
// 缓冲区已满,生产者等待
std::cout << "Buffer " << current_buffer << " 已满,生产者等待...\n";
producer_cv.wait(lock, [] { return (current_buffer == 1 && buffer1.size() < BUFFER_SIZE) ||
(current_buffer == 2 && buffer2.size() < BUFFER_SIZE); });
}
int item = i;
if (current_buffer == 1) {
buffer1.push_back(item);
std::cout << "生产到缓冲区 1: " << item << std::endl;
} else {
buffer2.push_back(item);
std::cout << "生产到缓冲区 2: " << item << std::endl;
}
lock.unlock(); // 释放互斥锁
consumer_cv.notify_one(); // 通知一个等待的消费者线程
// 切换使用的缓冲区
current_buffer = (current_buffer == 1) ? 2 : 1;
}
}
// 消费者函数
void consumer() {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟消费时间
std::unique_lock<std::mutex> lock(mtx); // 获取互斥锁,保护共享资源
if ((current_buffer == 1 && buffer1.empty()) ||
(current_buffer == 2 && buffer2.empty())) {
// 缓冲区为空,消费者等待
std::cout << "消费者在用Buffer " << current_buffer << ",Buffer " << current_buffer << " 为空,消费者等待...\n";
consumer_cv.wait(lock, [] { return (current_buffer == 1 && !buffer1.empty()) ||
(current_buffer == 2 && !buffer2.empty()); });
}
int item;
if (current_buffer == 1) {
item = buffer1.front();
buffer1.erase(buffer1.begin());
std::cout << "从缓冲区 1 消费: " << item << std::endl;
} else {
item = buffer2.front();
buffer2.erase(buffer2.begin());
std::cout << "从缓冲区 2 消费: " << item << std::endl;
}
lock.unlock(); // 释放互斥锁
producer_cv.notify_one(); // 通知一个等待的生产者线程
// 切换使用的缓冲区
current_buffer = (current_buffer == 1) ? 2 : 1;
}
}
int main() {
std::thread producer_thread(producer); // 创建生产者线程
std::thread consumer_thread(consumer); // 创建消费者线程
producer_thread.join(); // 等待生产者线程结束
consumer_thread.join(); // 等待消费者线程结束
return 0;
}
5.1个生产者,1个消费者,操作3个buffer
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
const int BUFFER_COUNT = 3;
std::mutex mtx;
std::condition_variable cv_producer, cv_consumer;
std::vector<std::queue<int>> buffers(BUFFER_COUNT);
int current_buffer = 0;
void producer() {
for (int i = 1; i <= 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// 检查当前缓冲区是否已满
while (!buffers[current_buffer].empty()) {
cv_producer.wait(lock);
}
buffers[current_buffer].push(i);
std::cout << "生产者生产了: " << i << " 到缓冲区: " << current_buffer << std::endl;
current_buffer = (current_buffer + 1) % BUFFER_COUNT;
// 唤醒消费者
cv_consumer.notify_all();
}
}
void consumer() {
for (int i = 1; i <= 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// 检查当前缓冲区是否为空
while (buffers[current_buffer].empty()) {
cv_consumer.wait(lock);
}
int item = buffers[current_buffer].front();
buffers[current_buffer].pop();
std::cout << "消费者消费了: " << item << " 从缓冲区: " << current_buffer << std::endl;
current_buffer = (current_buffer + 1) % BUFFER_COUNT;
// 唤醒生产者
cv_producer.notify_all();
}
}
int main() {
std::thread producer_thread(producer);
std::thread consumer_thread(consumer);
producer_thread.join();
consumer_thread.join();
return 0;
}
6.n个生产者,n个消费者,操作n个buffer
运行:偶尔会core dump
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
const int num_buffers = 3; // 缓冲区的数量
const int buffer_size = 5; // 每个缓冲区的大小
const int producers_and_consumers_num = 3; // 生产者和消费者的数量
std::vector<int> buffers[num_buffers]; // 多个缓冲区,每个缓冲区是一个整数向量
std::mutex buffer_mutexes[num_buffers]; // 与每个缓冲区相关联的互斥锁
std::condition_variable buffer_cv[num_buffers]; // 与每个缓冲区相关联的条件变量
// 生产者线程函数
void producer(int id) {
for (int i = 0; i < 10; ++i) {
// 查找具有最多可用空间的缓冲区
int max_space = -1;
int selected_buffer = -1;
for (int j = 0; j < num_buffers; ++j) {
std::unique_lock<std::mutex> lock(buffer_mutexes[j]);
int space = buffer_size - buffers[j].size();
if (space > max_space) {
max_space = space;
selected_buffer = j;
}
}
// 等待直到有足够的空间可以生产
while (buffers[selected_buffer].size() >= buffer_size) {
std::unique_lock<std::mutex> lock(buffer_mutexes[selected_buffer]);
buffer_cv[selected_buffer].wait(lock);
}
// 生产数据并将其放入缓冲区
buffers[selected_buffer].push_back(id * 100 + i);
std::cout << "生产者 " << id << " 生产了 " << id * 100 + i << " 放入缓冲区 " << selected_buffer << std::endl;
// 通知等待的消费者线程
buffer_cv[selected_buffer].notify_one();
}
}
// 消费者线程函数
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
// 查找具有最少可用空间的缓冲区,这表示有数据等待消费
int min_space = buffer_size + 1;
int selected_buffer = -1;
for (int j = 0; j < num_buffers; ++j) {
std::unique_lock<std::mutex> lock(buffer_mutexes[j]);
int space = buffer_size - buffers[j].size();
if (space < min_space) {
min_space = space;
selected_buffer = j;
}
}
// 等待直到有数据可以消费
while (buffers[selected_buffer].empty()) {
std::unique_lock<std::mutex> lock(buffer_mutexes[selected_buffer]);
buffer_cv[selected_buffer].wait(lock);
}
// 检查缓冲区是否为空,然后再消费数据
if (!buffers[selected_buffer].empty()) {
int data = buffers[selected_buffer].back();
buffers[selected_buffer].pop_back();
std::cout << "消费者 " << id << " 消费了 " << data << " 从缓冲区 " << selected_buffer << std::endl;
}
// 通知等待的生产者线程
buffer_cv[selected_buffer].notify_one();
}
}
int main() {
std::thread producers[producers_and_consumers_num];
std::thread consumers[producers_and_consumers_num];
// 创建生产者和消费者线程
for (int i = 0; i < producers_and_consumers_num; ++i) {
producers[i] = std::thread(producer, i);
consumers[i] = std::thread(consumer, i);
}
// 等待所有线程完成
for (int i = 0; i < producers_and_consumers_num; ++i) {
producers[i].join();
consumers[i].join();
}
return 0;
}
7.在生产者-消费者中使用信号量
这个示例模拟了一个生产者-消费者问题,其中多个生产者线程和消费者线程共享一个有界缓冲区,信号量用于控制对缓冲区的并发访问。
在此示例中,有三个生产者线程和三个消费者线程,它们共享一个有界缓冲区。Semaphore类用于控制缓冲区的空闲和满状态。生产者线程生成随机项目并将它们放入缓冲区,然后通知消费者线程。消费者线程从缓冲区中取出项目并通知生产者线程。信号量确保缓冲区在多线程环境中得到正确的访问和同步。
这个示例有助于理解信号量在多线程环境中的应用,尤其是在生产者-消费者问题中的作用。通过信号量,可以控制多个线程之间的并发访问,以避免数据竞态和确保正确的协调。
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
const int BUFFER_SIZE = 5;
class Semaphore {
public:
Semaphore(int count = 0) : count_(count) {}
void notify() {
std::unique_lock<std::mutex> lock(mutex_);
count_++;
cv_.notify_one();
}
void wait() {
std::unique_lock<std::mutex> lock(mutex_);
while (count_ == 0) {
cv_.wait(lock);
}
count_--;
}
private:
std::mutex mutex_;
std::condition_variable cv_;
int count_;
};
Semaphore empty(BUFFER_SIZE); // 空缓冲区的信号量
Semaphore full(0); // 满缓冲区的信号量
std::mutex bufferMutex; // 缓冲区互斥量
std::queue<int> buffer; // 共享缓冲区
void producer(int id) {
for (int i = 0; i < 10; ++i) {
int item = rand() % 100; // 随机生成一个项目
empty.wait(); // 等待空缓冲区
bufferMutex.lock(); // 锁定缓冲区
buffer.push(item); // 将项目放入缓冲区
std::cout << "Producer " << id << " produced: " << item << std::endl;
bufferMutex.unlock(); // 解锁缓冲区
full.notify(); // 通知缓冲区已满
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
full.wait(); // 等待满缓冲区
bufferMutex.lock(); // 锁定缓冲区
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed: " << item << std::endl;
bufferMutex.unlock(); // 解锁缓冲区
empty.notify(); // 通知缓冲区已空
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
}
int main() {
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; ++i) {
producers.emplace_back(producer, i);
consumers.emplace_back(consumer, i);
}
for (auto &producerThread : producers) {
producerThread.join();
}
for (auto &consumerThread : consumers) {
consumerThread.join();
}
return 0;
}