文章目录
- 九、多线程
- 7. 生产者消费者模型
- 生产者消费者模型的简单代码
- 结果演示
- 未完待续
九、多线程
7. 生产者消费者模型
生产者消费者模型的简单代码
Makefile:
cp:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f cp
Thread.hpp:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T&)>;
template<typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func, T& data, const std::string &name="none-name")
: _func(func)
, _data(data)
, _threadname(name)
, _stop(true)
{}
static void *threadroutine(void *args)
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T& _data;
func_t<T> _func;
bool _stop;
};
}
#endif
BlockQueue.hpp:
#ifndef __BLOCKQUEUE_HPP__
#define __BLOCKQUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
template<typename T>
class BlockQueue
{
private:
bool IsFull() const
{
return _block_queue.size() == _cap;
}
bool IsEmpty() const
{
return _block_queue.empty();
}
public:
BlockQueue(int cap)
:_cap(cap)
{
_productor_wait_num = 0;
_consumer_wait_num = 0;
// 初始化互斥锁
pthread_mutex_init(&_mutex, nullptr);
// 初始化条件变量
pthread_cond_init(&_productor_cond, nullptr);
// 初始化条件变量
pthread_cond_init(&_consumer_cond, nullptr);
}
// 生产者使用的入队列接口
void Enqueue(const T& in)
{
// 加锁
pthread_mutex_lock(&_mutex);
while (IsFull())
{
// 生产者等待数量加1
_productor_wait_num++;
// 等待条件变量通知唤醒并竞争到互斥锁
pthread_cond_wait(&_productor_cond, &_mutex);
// 生产者等待数量减1
_productor_wait_num--;
}
// 生产的数据入资源队列
_block_queue.push(in);
// 解锁
pthread_mutex_unlock(&_mutex);
// 通知消费者可以从等待队列中出队列
if (_consumer_wait_num > 0) pthread_cond_signal(&_consumer_cond);
}
// 消费者使用的出队列接口
void Pop(T* out)
{
// 加锁
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
// 消费者等待数量加1
_consumer_wait_num++;
// 等待条件变量通知唤醒并竞争到互斥锁
pthread_cond_wait(&_consumer_cond, &_mutex);
// 消费者等待数量减1
_consumer_wait_num--;
}
// 获取数据
*out = _block_queue.front();
// 数据出队列
_block_queue.pop();
// 解锁
pthread_mutex_unlock(&_mutex);
// 通知生产者可以从等待队列中出队列
if (_productor_wait_num > 0) pthread_cond_signal(&_productor_cond);
}
~BlockQueue()
{
// 销毁互斥锁
pthread_mutex_destroy(&_mutex);
// 销毁条件变量
pthread_cond_destroy(&_productor_cond);
// 销毁条件变量
pthread_cond_destroy(&_consumer_cond);
}
private:
std::queue<T> _block_queue;
// 容量上限
int _cap;
// 互斥锁
pthread_mutex_t _mutex;
// 条件变量,用于通知生产者可以入队列
pthread_cond_t _productor_cond;
// 条件变量,用于通知消费者可以出队列
pthread_cond_t _consumer_cond;
// 生产者等待数量
int _productor_wait_num;
// 消费者等待数量
int _consumer_wait_num;
};
#endif
Task.hpp:
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task()
{}
Task(int a, int b)
:_a(a)
,_b(b)
,_result(0)
{}
// 执行任务
void Execute()
{
_result = _a + _b;
}
std::string ResultToString()
{
return std::to_string(_a) + " + " + std::to_string(_b) + " = " + std::to_string(_result);
}
std::string DebugToString()
{
return std::to_string(_a) + " + " + std::to_string(_b) + " = ?";
}
private:
int _a;
int _b;
int _result;
};
Main.cc:
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
using namespace ThreadModule;
// 创建类型别名
using blockqueue_t = BlockQueue<Task>;
// 消费者线程
void Consumer(blockqueue_t& bq)
{
while (true)
{
Task t;
// 从阻塞队列中获取任务资源
bq.Pop(&t);
// 执行任务
t.Execute();
// 输出结果
std::cout << "Consumer: " << t.ResultToString() << std::endl;
}
}
// 生产者线程
void Productor(blockqueue_t& bq)
{
srand(time(nullptr)^pthread_self());
while (true)
{
// 分配任务
int a = rand() % 10 + 1;
usleep(1234);
int b = rand() % 20 + 1;
Task t(a, b);
// 任务放入阻塞队列
bq.Enqueue(t);
// 输出任务信息
std::cout << "Productor: " << t.DebugToString() << std::endl;
sleep(1);
}
}
// 启动线程
void StartComm(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq, func_t<blockqueue_t> func)
{
for (int i = 0; i < num; i++)
{
// 创建一批线程
std::string name = "thread-" + std::to_string(i + 1);
threads->emplace_back(func, bq, name);
threads->back().Start();
}
}
// 创建消费者线程
void StartConsumer(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{
StartComm(threads, num, bq, Consumer);
}
// 创建生产者线程
void StartProductor(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{
StartComm(threads, num, bq, Productor);
}
// 等待所有线程结束
void WaitAllThread(std::vector<Thread<blockqueue_t>>& threads)
{
for (auto& thread : threads)
{
thread.Join();
}
}
int main()
{
// 创建阻塞队列,容量为5
blockqueue_t* bq = new blockqueue_t(5);
// 创建线程
std::vector<Thread<blockqueue_t>> threads;
// 创建 1个消费者线程
StartConsumer(&threads, 1, *bq);
// 创建 1个生产者线程
StartProductor(&threads, 1, *bq);
// 等待所有线程结束
WaitAllThread(threads);
return 0;
}
结果演示
这里使用的是单生产者和单消费者,当然也可以在主函数处创建多生产者和多消费者的模型。