目录
【1】生产消费模型
【1.1】为何要使用生产者消费者模型
【1.2】生产者消费者模型优点
【2】基于阻塞队列的生产消费者模型
【2.1】生产消费模型打印模型
【2.2】生产消费模型计算公式模型
【2.3】生产消费模型计算公式加保存任务模型
【2.3】生产消费模型多生产多消费
【1】生产消费模型
生产消费模型的321原则(便于记忆)。
【解释】
-
3种关系:生产者和生产者(互斥)、消费者和消费者(互斥)、生产者和消费者(互斥|[保证共享资源的安全性]|同步)。
-
2种角色:生产者线程、消费者线程。
-
1种交易场所:一段特定结构的缓冲区。
【1.1】为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
【1.2】生产者消费者模型优点
-
生产线程和消费线程进行解耦。
-
支持并发。
-
提高效率。
-
支持生产和消费的一段时间的忙闲不均的问题。
【2】基于阻塞队列的生产消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
【2.1】生产消费模型打印模型
【makefile文件】
# 创建关联关系
cc=g++
standard=-std=c++11
# 创建依赖关系
myBlockQueue:BlockQueue.cc
$(cc) -o $@ $^ $(standard) -l pthread
# 创建删除关系
.PHONY:clean
clean:
rm -rf myBlockQueue
【BlockQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:
/* 构造函数 */
BlockQueue(const int maxCapacity = g_maxCapacity)
: _maxCapacity(maxCapacity)
{
// 初始化锁
pthread_mutex_init(&_mutex, nullptr);
// 初始化生产者信号量
pthread_cond_init(&_pCond, nullptr);
// 初始化消费者信号量
pthread_cond_init(&_cCond, nullptr);
}
/* 析构函数 */
~BlockQueue()
{
// 销毁锁
pthread_mutex_destroy(&_mutex);
// 销毁生产者信号量
pthread_cond_destroy(&_pCond);
// 销毁消费者信号量
pthread_cond_destroy(&_cCond);
}
public:
/* 新增任务 */
void PushTask(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否满
// 细节二:充当条件判断的语法必须是while,不能是if
while(IsFull())
{
// 如果容器满了,生产者就不能继续生产了!
// 细节一:
// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!
// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!
// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!
pthread_cond_wait(&_pCond, &_mutex);
}
// 程序走到这里,容器一定是没有满的!
_q.push(in);
// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
pthread_cond_signal(&_cCond); // 通知消费者已经生产了!
// 解锁
pthread_mutex_unlock(&_mutex);
}
/* 执行任务 */
void PopTask(T* out)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否空
// 细节二:与PushTask一致
while(IsEmpty())
{
// 如果容器空了,消费者就不能继续消费了!
// 细节一:与PushTask一致
pthread_cond_wait(&_cCond, &_mutex);
}
// 程序走到这里,容器一定是没有空的!
*out = _q.front();
_q.pop();
// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
pthread_cond_signal(&_pCond); // 通知生产者已经消费了!
// 解锁
pthread_mutex_unlock(&_mutex);
}
private:
/* 判断容器满 */
bool IsFull()
{
return _q.size() == _maxCapacity;
}
/* 判断容器空 */
bool IsEmpty()
{
return _q.empty();
}
private:
std::queue<T> _q; // 存储容器
int _maxCapacity; // 标识存储重启最大容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _pCond; // 生产者条件变量
pthread_cond_t _cCond; // 消费者条件变量
};
【BlockQueue.cc文件】
#include "BlockQueue.hpp"
/* 定义生产者线程 */
void *Producer(void *args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
// 生产者进行生产
while(true)
{
int num = rand() % 10 + 1;
bq->PushTask(num);
std:: cout << "生产者在生产:" << num << std::endl;
sleep(1);
}
}
/* 定义消费者线程 */
void *Consumer(void *args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
// 消费者进行消费
while(true)
{
int num = 0;
bq->PopTask(&num);
std:: cout << "消费者在消费:" << num << std::endl;
sleep(2);
}
}
/* 入口函数 */
int main()
{
// 随机数种子
srand((unsigned int)time(nullptr) ^ getpid());
// 共享资源
BlockQueue<int> *bqs = new BlockQueue<int>();
pthread_t tP;
pthread_t tC;
pthread_create(&tP, nullptr, Producer, (void *)bqs);
pthread_create(&tC, nullptr, Consumer, (void *)bqs);
pthread_join(tP, nullptr);
pthread_join(tC, nullptr);
return 0;
}
【2.2】生产消费模型计算公式模型
【makefile文件】
# 创建关联关系
cc=g++
standard=-std=c++11
# 创建依赖关系
myBlockQueue:BlockQueue.cc
$(cc) -o $@ $^ $(standard) -l pthread
# 创建删除关系
.PHONY:clean
clean:
rm -rf myBlockQueue
【BlockQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:
/* 构造函数 */
BlockQueue(const int maxCapacity = g_maxCapacity)
: _maxCapacity(maxCapacity)
{
// 初始化锁
pthread_mutex_init(&_mutex, nullptr);
// 初始化生产者信号量
pthread_cond_init(&_pCond, nullptr);
// 初始化消费者信号量
pthread_cond_init(&_cCond, nullptr);
}
/* 析构函数 */
~BlockQueue()
{
// 销毁锁
pthread_mutex_destroy(&_mutex);
// 销毁生产者信号量
pthread_cond_destroy(&_pCond);
// 销毁消费者信号量
pthread_cond_destroy(&_cCond);
}
public:
/* 新增任务 */
void PushTask(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否满
// 细节二:充当条件判断的语法必须是while,不能是if
while(IsFull())
{
// 如果容器满了,生产者就不能继续生产了!
// 细节一:
// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!
// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!
// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!
pthread_cond_wait(&_pCond, &_mutex);
}
// 程序走到这里,容器一定是没有满的!
_q.push(in);
// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
pthread_cond_signal(&_cCond); // 通知消费者已经生产了!
// 解锁
pthread_mutex_unlock(&_mutex);
}
/* 执行任务 */
void PopTask(T* out)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否空
// 细节二:与PushTask一致
while(IsEmpty())
{
// 如果容器空了,消费者就不能继续消费了!
// 细节一:与PushTask一致
pthread_cond_wait(&_cCond, &_mutex);
}
// 程序走到这里,容器一定是没有空的!
*out = _q.front();
_q.pop();
// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
pthread_cond_signal(&_pCond); // 通知生产者已经消费了!
// 解锁
pthread_mutex_unlock(&_mutex);
}
private:
/* 判断容器满 */
bool IsFull()
{
return _q.size() == _maxCapacity;
}
/* 判断容器空 */
bool IsEmpty()
{
return _q.empty();
}
private:
std::queue<T> _q; // 存储容器
int _maxCapacity; // 标识存储重启最大容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _pCond; // 生产者条件变量
pthread_cond_t _cCond; // 消费者条件变量
};
【Task.hpp文件】
#pragma once
#include <iostream>
#include <string>
#include <functional>
/* 仿函数类 */
class Task
{
public:
using func_t = std::function<int(int, int, char)>;
public:
/* 构造函数 */
Task() {}
/* 构造函数 */
Task(int x, int y, char op, func_t func)
: _x(x), _y(y), _op(op), _callBalk(func)
{
}
public:
/* 仿函数 */
std::string operator()()
{
int result = _callBalk(_x, _y, _op);
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);
return buffer;
}
public:
/* 返回打印公式 */
std::string ToTaskString()
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callBalk;
};
/* 任务执行的种类 */
int MyCalculate(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
{
result = x / y;
}
break;
}
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
{
result = x % y;
}
break;
}
default:
break;
}
return result;
}
【BlockQueue.cc文件】
#include "BlockQueue.hpp"
#include "Task.hpp"
class Task;
int MyCalculate(int x, int y, char op);
const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{
BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);
// 生产者进行生产
while(true)
{
int x = rand() % 10 + 1;
int y = rand() % 10 + 1;
int op = rand() % oper.size();
Task t(x, y, oper[op], MyCalculate);
calBq->PushTask(t);
std:: cout << "生产任务:" << t.ToTaskString() << std::endl;
sleep(2);
}
}
/* 定义消费者线程 */
void *Consumer(void *args)
{
BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);
// 消费者进行消费
while(true)
{
Task t;
calBq->PopTask(&t);
std:: cout << "消费任务:" << t() << std::endl;
sleep(1);
}
}
/* 入口函数 */
int main()
{
// 随机数种子
srand((unsigned int)time(nullptr) ^ getpid());
// 共享资源
BlockQueue<Task> *bqs = new BlockQueue<Task>();
pthread_t tP;
pthread_t tC;
pthread_create(&tP, nullptr, Producer, (void *)bqs);
pthread_create(&tC, nullptr, Consumer, (void *)bqs);
pthread_join(tP, nullptr);
pthread_join(tC, nullptr);
return 0;
}
【2.3】生产消费模型计算公式加保存任务模型
【makefile文件】
# 创建关联关系
cc=g++
standard=-std=c++11
# 创建依赖关系
myBlockQueue:BlockQueue.cc
$(cc) -o $@ $^ $(standard) -l pthread
# 创建删除关系
.PHONY:clean
clean:
rm -rf myBlockQueue
【BlockQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 500;
template <class T>
class BlockQueue
{
public:
/* 构造函数 */
BlockQueue(const int maxCapacity = g_maxCapacity)
: _maxCapacity(maxCapacity)
{
// 初始化锁
pthread_mutex_init(&_mutex, nullptr);
// 初始化生产者信号量
pthread_cond_init(&_pCond, nullptr);
// 初始化消费者信号量
pthread_cond_init(&_cCond, nullptr);
}
/* 析构函数 */
~BlockQueue()
{
// 销毁锁
pthread_mutex_destroy(&_mutex);
// 销毁生产者信号量
pthread_cond_destroy(&_pCond);
// 销毁消费者信号量
pthread_cond_destroy(&_cCond);
}
public:
/* 新增任务 */
void PushTask(const T &in)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否满
// 细节二:充当条件判断的语法必须是while,不能是if
while(IsFull())
{
// 如果容器满了,生产者就不能继续生产了!
// 细节一:
// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!
// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!
// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!
pthread_cond_wait(&_pCond, &_mutex);
}
// 程序走到这里,容器一定是没有满的!
_q.push(in);
// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
pthread_cond_signal(&_cCond); // 通知消费者已经生产了!
// 解锁
pthread_mutex_unlock(&_mutex);
}
/* 执行任务 */
void PopTask(T* out)
{
// 加锁
pthread_mutex_lock(&_mutex);
// 判断是否空
// 细节二:与PushTask一致
while(IsEmpty())
{
// 如果容器空了,消费者就不能继续消费了!
// 细节一:与PushTask一致
pthread_cond_wait(&_cCond, &_mutex);
}
// 程序走到这里,容器一定是没有空的!
*out = _q.front();
_q.pop();
// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
pthread_cond_signal(&_pCond); // 通知生产者已经消费了!
// 解锁
pthread_mutex_unlock(&_mutex);
}
private:
/* 判断容器满 */
bool IsFull()
{
return _q.size() == _maxCapacity;
}
/* 判断容器空 */
bool IsEmpty()
{
return _q.empty();
}
private:
std::queue<T> _q; // 存储容器
int _maxCapacity; // 标识存储重启最大容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _pCond; // 生产者条件变量
pthread_cond_t _cCond; // 消费者条件变量
};
【Task.hpp文件】
#pragma once
#include <iostream>
#include <string>
#include <functional>
/* 计算任务 */
class CalTask
{
public:
using func_t = std::function<int(int, int, char)>;
public:
/* 构造函数 */
CalTask() {}
/* 构造函数 */
CalTask(int x, int y, char op, func_t func)
: _x(x), _y(y), _op(op), _callBalk(func)
{
}
public:
/* 仿函数 */
std::string operator()()
{
int result = _callBalk(_x, _y, _op);
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);
return buffer;
}
public:
/* 返回打印公式 */
std::string ToTaskString()
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callBalk;
};
/* 执行计算的方法 */
int MyCalculate(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
{
result = x / y;
}
break;
}
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
{
result = x % y;
}
break;
}
default:
break;
}
return result;
}
/* 保存任务 */
class SaveTask
{
public:
using func_t = std::function<void(const std::string &)>;
public:
/* 构造函数 */
SaveTask() {}
/* 构造函数 */
SaveTask(const std::string &message, func_t func)
: _message(message), _callBalk(func)
{
}
public:
/* 仿函数 */
void operator()()
{
_callBalk(_message);
}
private:
std::string _message;
func_t _callBalk;
};
/* 保存方法 */
void Save(const std::string& massage){
std::string target = "./log.txt";
FILE *fp = fopen(target.c_str(), "a+");
if(fp == NULL){
std::cerr << "fopen fail!" << std::endl;
return;
}
fputs(massage.c_str(), fp);
fputs("\n", fp);
fclose(fp);
}
【BlockQueue.cc文件】
#include "BlockQueue.hpp"
#include "Task.hpp"
/* 共享资源Queue封装 */
template <class C, class S>
class BlockQueues
{
public:
BlockQueue<C> *c_bq;
BlockQueue<S> *s_bq;
};
const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{
BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;
// 生产者进行生产
while (true)
{
int x = rand() % 10 + 1;
int y = rand() % 10 + 1;
int op = rand() % oper.size();
CalTask t(x, y, oper[op], MyCalculate);
calBq->PushTask(t);
std::cout << "Producer-生产任务:" << t.ToTaskString() << std::endl;
sleep(2);
}
return nullptr;
}
/* 定义消费者线程 */
void *Consumer(void *args)
{
BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;
BlockQueue<SaveTask> *serverBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;
// 消费者进行消费
while (true)
{
CalTask t;
calBq->PopTask(&t);
std::string messgae = t();
std::cout << "Consumer-消费任务:" << messgae << std::endl;
SaveTask server(messgae, Save);
serverBq->PushTask(server);
std::cout << "Consumer-推送保存完成..." << std::endl;
}
return nullptr;
}
/* 定义保存线程 */
void *Saver(void *args)
{
BlockQueue<SaveTask> *saveQ = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;
while(true){
SaveTask t;
saveQ->PopTask(&t);
t();
std::cout << "SAVER-保存任务完成..." << std::endl;
}
return nullptr;
}
#define T_PRODUCER 5
#define T_CONSUMER 10
/* 入口函数 */
int main()
{
// 随机数种子
srand((unsigned int)time(nullptr) ^ getpid());
// 共享资源
BlockQueues<CalTask, SaveTask> bqs;
bqs.c_bq = new BlockQueue<CalTask>();
bqs.s_bq = new BlockQueue<SaveTask>();
// pthread_t tP;
// pthread_t tC;
// pthread_t tS;
// pthread_create(&tP, nullptr, Producer, (void *)&bqs);
// pthread_create(&tC, nullptr, Consumer, (void *)&bqs);
// pthread_create(&tS, nullptr, Saver, (void *)&bqs);
// pthread_join(tP, nullptr);
// pthread_join(tC, nullptr);
// pthread_join(tS, nullptr);
// 创建生产者线程
pthread_t tP[T_PRODUCER];
for (int i = 0; i < T_PRODUCER; i++)
{
pthread_create(&tP[i], nullptr, Producer, (void *)&bqs);
}
// 创建消费者线程
pthread_t tC[T_CONSUMER];
for (int i = 0; i < T_CONSUMER; i++)
{
pthread_create(&tC[i], nullptr, Consumer, (void *)&bqs);
}
// 创建保存线程
pthread_t tS;
pthread_create(&tS, nullptr, Saver, (void *)&bqs);
// 等待生产者线程回收
for(int i = 0; i < T_PRODUCER; i++)
{
pthread_join(*(tP + 1), nullptr);
}
// 等待消费者线程回收
for(int i = 0; i < T_CONSUMER; i++)
{
pthread_join(*(tC + 1), nullptr);
}
// 等待保存线程回收
pthread_join(tS, nullptr);
delete bqs.c_bq;
delete bqs.s_bq;
return 0;
}
【2.3】生产消费模型多生产多消费
【Makefile文件】
# 定义变量与参数字符串进行关联
cc=g++
standard=-std=c++11
# 定义编译关系
myBackQueue: ThreadBackQueue.cc
$(cc) -o $@ $^ $(standard) -lpthread
# 定义命令
clean:
rm -rf myBackQueue
# 配置指令与系统指令分离
.PHONY: clean
【ThreadBackQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "ThreadMutex.hpp"
const int g_capacityMax = 100;
/* 生产消费者模型封装类 */
template<class T>
class ThreadBackQueue
{
public:
/* - 构造函数
*/
ThreadBackQueue(const int& capacity = g_capacityMax)
: _qCapacity(capacity)
{
// 初始化互斥锁
pthread_mutex_init(&_mutex, nullptr);
// 初始化生产者条件变量
pthread_cond_init(&_pCond, nullptr);
// 初始化消费者条件变量
pthread_cond_init(&_cCond, nullptr);
}
/* - 析构函数
*/
~ThreadBackQueue()
{
// 释放互斥锁
pthread_mutex_destroy(&_mutex);
// 释放生产者条件变量
pthread_cond_destroy(&_pCond);
// 释放消费者条件变量
pthread_cond_destroy(&_cCond);
}
public:
/* - 增加任务
*/
void Push(const T& in)
{
// 加锁
pthread_mutex_lock(&_mutex);
// LockGuardMutex(&_mutex);
// 判断是否满
while(IsFull())
pthread_cond_wait(&_pCond, &_mutex); // 去等待
// 一定有空位置
_q.push(in);
// 一定有任务
pthread_cond_signal(&_cCond);
pthread_mutex_unlock(&_mutex);
}
/* - 处理任务
*/
void Pop(T* out)
{
// 加锁
pthread_mutex_lock(&_mutex);
// LockGuardMutex(&_mutex);
// 判断是否空
while(IsEmpty())
pthread_cond_wait(&_cCond, &_mutex); // 去等待
// 一定有任务
*out = _q.front();
_q.pop();
// 一定有空位置
if(GetTaskSize() == 1)
pthread_cond_signal(&_pCond);
pthread_mutex_unlock(&_mutex);
}
public:
/* - 判断队列满
*/
bool IsFull()
{
return _q.size() == _qCapacity;
}
/* - 判断队列空
*/
bool IsEmpty()
{
return _q.empty();
}
/* - 获取队列中任务个数
*/
size_t GetTaskSize()
{
return _q.size();
}
private:
std::queue<T> _q; // 消息队列缓冲区
size_t _qCapacity; // 消息队列容量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _pCond; // 生产者条件变量
pthread_cond_t _cCond; // 消费者条件变量
};
【ThreadTask.hpp文件】
#pragma once
#include <cstdio>
#include <iostream>
#include <string>
#include <functional>
#include "ThreadBackQueue.hpp"
class TaskCalculate;
class TaskSave;
template<class C, class S>
class ThreadBackQueues
{
public:
ThreadBackQueue<C>* _cTask;
ThreadBackQueue<S>* _sTask;
};
class TaskCalculate
{
private:
// 定义仿函数
using func_t = std::function<int(const int, const int, const char)>;
public:
/* - 无参构造函数
*/
TaskCalculate()
{}
/* - 带参数的构造函数
*/
TaskCalculate(func_t func, const int x, const int y, const char op)
: _func(func)
, _x(x)
, _y(y)
, _op(op)
{}
public:
/* - ()运算符重载
*/
std::string operator()()
{
int result = _func(_x, _y, _op);
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);
return buffer;
}
public:
std::string TaskString()
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);
return buffer;
}
private:
int _x; // 第一个计算值
int _y; // 第二个计算值
char _op; // 第三个计算值
func_t _func; // 仿函数类型
};
int Calculate(const int x, const int y, const char op)
{
int calRet = 0;
switch(op)
{
case '+':
{
calRet = x + y;
break;
}
case '-':
{
calRet = x - y;
break;
}
case '*':
{
calRet = x * y;
break;
}
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
calRet = -1;
}
else
{
calRet = x / y;
}
break;
}
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
calRet = -1;
}
else
{
calRet = x % y;
}
break;
}
default:
{
break;
}
}
return calRet;
};
class TaskSave
{
private:
using func_t = std::function<void(const std::string&)>;
public:
/* - 无参构造函数
*/
TaskSave()
{}
/* - 带参构造函数
*/
TaskSave(func_t func, const std::string& msg)
: _func(func)
, _msg(msg)
{}
public:
/* - ()运算符重载
*/
void operator()()
{
_func(_msg);
}
private:
std::string _msg;
func_t _func;
};
void FileSave(const std::string& msg)
{
// 创建打开文件目录
std::string target = "./Log.txt";
// 打开文件
FILE* fpath = fopen(target.c_str(), "a+");
if(fpath == nullptr)
{
std::cerr << "fopen fail!" << std::endl;
return;
}
// 写入文件
fputs(msg.c_str(), fpath);
fputs("\n", fpath);
// 关闭文件
fclose(fpath);
}
【ThreadBase.hpp文件】
#pragma once
#include <cstdio>
#include <cassert>
#include <iostream>
#include <functional>
#include <string>
#include <pthread.h>
class ThreadBase;
/* 线程上下文数据封装类 */
class ThreadBaseConnectText
{
public:
ThreadBaseConnectText()
: _textThis(nullptr)
, _textArgs(nullptr)
{}
public:
ThreadBase* _textThis;
void* _textArgs;
};
/* 基于原生线程库的线程封装类 */
class ThreadBase
{
private:
const int ctNum = 64;
public:
// 定义仿函数
using func_t = std::function<void*(void*)>;
public:
public:
/* - 构造函数
* - func: 线程回调函数
* - args:线程回调函数参数
* - num : 编写线程名称设定的编号
*/
ThreadBase(func_t func, void* args = nullptr, const int& num = 1)
: _threadCallBack(func)
, _threadArgs(args)
{
// 自定义线程名称
char nameBuffer[ctNum];
snprintf(nameBuffer, sizeof(nameBuffer), "thread-%d", num);
_threadName = nameBuffer;
// 创建线程连接上下文 - 手动释放内存 - 【01】
ThreadBaseConnectText* connectText = new ThreadBaseConnectText();
connectText->_textThis = this;
connectText->_textArgs = _threadArgs;
int state = pthread_create(&_threadId, nullptr, StartRoutine, (void*)connectText);
assert(state == 0); (void)state;
}
/* - 析构函数
*/
~ThreadBase()
{}
public:
/* - 线程等待
*/
void Join()
{
int state = pthread_join(_threadId, nullptr);
assert(state == 0); (void)state;
}
public:
/* - 获取线程名称
*/
std::string GetThreadName()
{
return _threadName;
}
/* - 获取线程Id
*/
std::string GetThreadId()
{
char buffer[ctNum];
snprintf(buffer, sizeof(buffer), "0x%x", _threadId);
return buffer;
}
public:
/* - 线程函数
*/
static void* StartRoutine(void* args)
{
ThreadBaseConnectText* connectText = static_cast<ThreadBaseConnectText*>(args);
void* retVal = connectText->_textThis->Run(connectText->_textArgs);
// 释放内存 - 【01】
delete connectText;
// 返回
return retVal;
}
private:
/* - StartRoutine专用函数(因为C/C++混编的原因)
*/
void* Run(void* args)
{
// 调用回调线程
return _threadCallBack(args);
}
private:
std::string _threadName; // 线程名称
pthread_t _threadId; // 线程Id
func_t _threadCallBack; // 线程回调函数
void* _threadArgs; // 线程回调函数参数
};
【ThreadMutex.hpp文件】
#pragma once
#include <pthread.h>
/* 原生线程锁类封装 */
class Mutex
{
public:
/* - 构造函数
*/
Mutex(pthread_mutex_t* mutex)
: _pMutex(mutex)
{}
/* - 析构函数
*/
~Mutex()
{}
public:
/* - 加锁函数
*/
void Lock() { pthread_mutex_lock(_pMutex); }
/* - 解锁函数
*/
void UnLock() { pthread_mutex_unlock(_pMutex); }
private:
pthread_mutex_t* _pMutex; // 内部的线程锁
};
class LockGuardMutex
{
public:
/* - 构造函数
*/
LockGuardMutex(pthread_mutex_t* mutex)
: _mutex(mutex)
{
_mutex.Lock();
}
/* - 析构函数
*/
~LockGuardMutex()
{
_mutex.UnLock();
}
private:
Mutex _mutex;
};
【ThreadBackQueue.cc文件】
#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <unistd.h>
#include "ThreadBase.hpp"
#include "ThreadBackQueue.hpp"
#include "ThreadTask.hpp"
std::string g_ops = "+-*/%";
/* - 生产者线程函数
*/
void* ProducerThread(void* args)
{
ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;
// 生产
while(true)
{
int x = rand() % 100;
int y = rand() % 100;
int o = rand() % g_ops.size();
TaskCalculate task(Calculate, x, y, g_ops[o]);
tBQ->Push(task);
std::cout << "生产者在生产任务-> " << "[" << task.TaskString() << "] - - 队列任务个数为:" << tBQ->GetTaskSize() << std::endl;
sleep(1);
}
return nullptr;
}
/* - 消费者线程函数
*/
void* ConsumeThread(void* args)
{
ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;
ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;
// 消费
while(true)
{
TaskCalculate calculateTask;
tBQ->Pop(&calculateTask);
std::string strRet = calculateTask();
std::cout << "消费者在消费任务-> " << "[" << strRet << "] - - 队列任务个数为:" << tBQ->GetTaskSize() << std::endl;
TaskSave saveTask(FileSave, strRet);
sBQ->Push(saveTask);
std::cout << "消费者在保存任务-> " << "[" << strRet << "] - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;
sleep(3);
}
return nullptr;
}
/* - 保存者线程函数
*/
void* SaveThread(void* args)
{
ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;
while(true)
{
// 消费保存任务
TaskSave saveTask;
sBQ->Pop(&saveTask);
// 执行保存任务
saveTask();
// 保存完成任务
std::cout << "保存任务完成"<< " - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;
}
return nullptr;
}
/* - 程序入口函数
*/
int main()
{
// 创建随机数种子
srand((unsigned int)time(nullptr));
// 创建共享资源
ThreadBackQueues<TaskCalculate, TaskSave>* pBQ = new ThreadBackQueues<TaskCalculate, TaskSave>();
pBQ->_cTask = new ThreadBackQueue<TaskCalculate>;
pBQ->_sTask = new ThreadBackQueue<TaskSave>;
// 创建生产者线程
std::unique_ptr<ThreadBase> ptr_pt1(new ThreadBase(ProducerThread, (void*)pBQ, 1));
std::unique_ptr<ThreadBase> ptr_pt2(new ThreadBase(ProducerThread, (void*)pBQ, 2));
std::unique_ptr<ThreadBase> ptr_pt3(new ThreadBase(ProducerThread, (void*)pBQ, 3));
std::unique_ptr<ThreadBase> ptr_pt4(new ThreadBase(ProducerThread, (void*)pBQ, 4));
std::unique_ptr<ThreadBase> ptr_pt5(new ThreadBase(ProducerThread, (void*)pBQ, 5));
std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt1->GetThreadName() << " 线程Id:" << ptr_pt1->GetThreadId() << std::endl;
std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt2->GetThreadName() << " 线程Id:" << ptr_pt2->GetThreadId() << std::endl;
std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt3->GetThreadName() << " 线程Id:" << ptr_pt3->GetThreadId() << std::endl;
std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt4->GetThreadName() << " 线程Id:" << ptr_pt4->GetThreadId() << std::endl;
std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt5->GetThreadName() << " 线程Id:" << ptr_pt5->GetThreadId() << std::endl;
sleep(10);
// 创建消费者线程
std::unique_ptr<ThreadBase> ptr_ct1(new ThreadBase(ConsumeThread, (void*)pBQ, 11));
std::unique_ptr<ThreadBase> ptr_ct2(new ThreadBase(ConsumeThread, (void*)pBQ, 12));
std::unique_ptr<ThreadBase> ptr_ct3(new ThreadBase(ConsumeThread, (void*)pBQ, 13));
std::unique_ptr<ThreadBase> ptr_ct4(new ThreadBase(ConsumeThread, (void*)pBQ, 14));
std::unique_ptr<ThreadBase> ptr_ct5(new ThreadBase(ConsumeThread, (void*)pBQ, 15));
std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct1->GetThreadName() << " 线程Id:" << ptr_ct1->GetThreadId() << std::endl;
std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct2->GetThreadName() << " 线程Id:" << ptr_ct2->GetThreadId() << std::endl;
std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct3->GetThreadName() << " 线程Id:" << ptr_ct3->GetThreadId() << std::endl;
std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct4->GetThreadName() << " 线程Id:" << ptr_ct4->GetThreadId() << std::endl;
std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct5->GetThreadName() << " 线程Id:" << ptr_ct5->GetThreadId() << std::endl;
sleep(10);
// 创建保存者线程
std::unique_ptr<ThreadBase> ptr_st1(new ThreadBase(SaveThread, (void*)pBQ, 21));
std::unique_ptr<ThreadBase> ptr_st2(new ThreadBase(SaveThread, (void*)pBQ, 22));
std::unique_ptr<ThreadBase> ptr_st3(new ThreadBase(SaveThread, (void*)pBQ, 23));
std::unique_ptr<ThreadBase> ptr_st4(new ThreadBase(SaveThread, (void*)pBQ, 24));
std::unique_ptr<ThreadBase> ptr_st5(new ThreadBase(SaveThread, (void*)pBQ, 25));
std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st1->GetThreadName() << " 线程Id:" << ptr_st1->GetThreadId() << std::endl;
std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st2->GetThreadName() << " 线程Id:" << ptr_st2->GetThreadId() << std::endl;
std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st3->GetThreadName() << " 线程Id:" << ptr_st3->GetThreadId() << std::endl;
std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st4->GetThreadName() << " 线程Id:" << ptr_st4->GetThreadId() << std::endl;
std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st5->GetThreadName() << " 线程Id:" << ptr_st5->GetThreadId() << std::endl;
// 等待线程结束
ptr_pt1->Join();
ptr_pt2->Join();
ptr_pt3->Join();
ptr_pt4->Join();
ptr_pt5->Join();
ptr_ct1->Join();
ptr_ct2->Join();
ptr_ct3->Join();
ptr_ct4->Join();
ptr_ct5->Join();
ptr_st1->Join();
ptr_st2->Join();
ptr_st3->Join();
ptr_st4->Join();
ptr_st5->Join();
delete pBQ->_cTask;
delete pBQ->_sTask;
delete pBQ;
return 0;
}