文章目录
- ☀️一、线程同步
- 🌻1.条件变量
- 🌻2.同步概念与竞态条件
- 🌻3.条件变量函数
- 🌻4.条件变量使用规范
- 🌻5.代码案例
- ☀️二、生产者消费者模型
- 🌻1.为何要使用生产者消费者模型
- 🌻2.生产者消费者模型优点
- 🌻3.生产消费的关系
- ☀️三、基于BlockingQueue的生产者消费者模型
- 🌻1.概念
- 🌻2.代码示例
- ⚡(1)生产者给消费者派发整形数据(简单版)
- ⚡(2)生产者给消费者派发任务(复杂版)
- 🌻3.探究:生产消费模型高校在哪里
☀️一、线程同步
🌻1.条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
- 通过条件变量,我们可以实现线程同步,即可让线程顺序进行。
- 我们访问临界资源的模式一般是这样的:对临界资源加锁,判断(是否满足条件变量/生产消费条件),解锁。
🌻2.同步概念与竞态条件
同步
:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。竞态条件
:因为线程运行的时序问题,而导致程序异常,我们称之为竞态条件。
🌻3.条件变量函数
- (1)初始化
动态设置
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:
cond:要初始化的条件变量
attr:NULL
静态设置
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //静态设置条件变量(不用初始化、销毁)
- (2)销毁
int pthread_cond_destroy(pthread_cond_t *cond)
- (3)等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
-
在新线程内部调用
pthread_cond_wait
,可让线程加入等待队列中。 -
(4)唤醒等待
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
- 在新线程内部调用
pthread_cond_signal
,就是把线程从等待队列中拿出来,放到CPU中运行。
🌻4.条件变量使用规范
- 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
- 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
🌻5.代码案例
- makefile
testCond:testCond.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f testCond
- testCond.cc
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //静态设置条件变量(不用初始化、销毁)
void* start_routine(void* args)
{
std::string name = static_cast<const char*>(args);
while (true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex); //将新线程放入等待队列
// wait函数参数为什么要有mutex?为了后续释放和再次获取mutex(锁)
//线程进入阻塞队列时要释放锁,为了能让别的线程能访问该临界资源
//线程被唤醒之后需要重新把锁申请回来
//判断暂时省略
std::cout << name << " -> " << tickets << std::endl;
tickets--;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
// 通过条件变量控制线程的执行
pthread_t t[5];
for (int i = 0; i < 5; i++)
{
char* name = new char[64];
snprintf(name, 64, "thread %d", i + 1);
pthread_create(t + i, nullptr, start_routine, name);
}
while (true)
{
sleep(1); //#include <unistd.h>
// pthread_cond_signal(&cond); //每次循环唤醒一个新线程
pthread_cond_broadcast(&cond); //唤醒一批线程(所有线程都会被唤醒)
std::cout << "main thread wakeup one thread..." << std::endl;
}
for (int i = 0; i < 5; i++)
{
pthread_join(t[i], nullptr);
}
return 0;
}
- 运行结果
☀️二、生产者消费者模型
🌻1.为何要使用生产者消费者模型
- 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
- 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
- 这个阻塞队列就是用来给生产者和消费者解耦的。
- 阻塞队列(临时保存的数据场所)在缓冲区中。
🌻2.生产者消费者模型优点
- 解耦(将生产和消费过程进行分离)
- 支持并发
- 支持忙闲不均
🌻3.生产消费的关系
- 生产者和生产者之间 - 互斥关系(竞争关系)。
- 消费者和消费者之间 - 互斥关系。
- 生产者和消费者之间 - 互斥 && 同步(生产消费需要访问同一份资源时为互斥,生产消费协同进行为同步)。
生产消费模型巧记 - 321原则:
- 3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥(保证共享资源安全性) && 同步) 。
- 2种角色:生产者线程,消费者线程。
- 1个交易场所:一段特定结构的缓冲区。
- 生产消费的产品就是数据。
☀️三、基于BlockingQueue的生产者消费者模型
🌻1.概念
- 在多线程编程中
阻塞队列(Blocking Queue)
是一种常用于实现生产者和消费者模型的数据结构。 - 其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
🌻2.代码示例
⚡(1)生产者给消费者派发整形数据(简单版)
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>
#define NUM 8
class BlockQueue {
private:
std::queue<int> q;
int cap;
pthread_mutex_t lock;
pthread_cond_t full;
pthread_cond_t empty;
private:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&lock);
}
void ProductWait()
{
pthread_cond_wait(&full, &lock);
}
void ConsumeWait()
{
pthread_cond_wait(&empty, &lock);
}
void NotifyProduct()
{
pthread_cond_signal(&full);
}
void NotifyConsume()
{
pthread_cond_signal(&empty);
}
bool IsEmpty()
{
return (q.size() == 0 ? true : false);
}
bool IsFull()
{
return (q.size() == cap ? true : false);
}
public:
BlockQueue(int _cap = NUM) :cap(_cap)
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
}
void PushData(const int& data)
{
LockQueue();
while (IsFull()) {
NotifyConsume();
std::cout << "queue full, notify consume data, product stop." << std::endl;
ProductWait();
}
q.push(data);
// NotifyConsume();
UnLockQueue();
}
void PopData(int& data)
{
LockQueue();
while (IsEmpty()) {
NotifyProduct();
std::cout << "queue empty, notify product data, consume stop." << std::endl;
ConsumeWait();
}
data = q.front();
q.pop();
// NotifyProduct();
UnLockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
};
void* consumer(void* arg)
{
BlockQueue* bqp = (BlockQueue*)arg;
int data;
for (; ; ) {
bqp->PopData(data);
std::cout << "Consume data done : " << data << std::endl;
}
}
//more faster
void* producter(void* arg)
{
BlockQueue* bqp = (BlockQueue*)arg;
srand((unsigned long)time(NULL));
for (; ; ) {
int data = rand() % 1024;
bqp->PushData(data);
std::cout << "Prodoct data done: " << data << std::endl;
// sleep(1);
}
}
int main()
{
BlockQueue bq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void*)&bq);
pthread_create(&p, NULL, producter, (void*)&bq);
pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}
⚡(2)生产者给消费者派发任务(复杂版)
生产者派发任务(计算任务) -> 放入阻塞队列1 -> 消费处理任务 -> 放入阻塞队列2(将结果储存) -> 记录任务结果(保存在文件中)
- BlockQueue.hpp
#include <iostream>
#include <queue>
#include <pthread.h>
const int gmaxcap = 500;
template <class T>
class BlockQueue
{
public:
BlockQueue(const int& maxcap = gmaxcap) :_maxcap(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
void push(const T& in) // 输入型参数,const &
{
pthread_mutex_lock(&_mutex);
// 1. 判断
// 细节2: 充当条件判断的语法必须是while,不能用if
while (is_full()) //bug?
{
// 细节1:pthread_cond_wait这个函数的第二个参数,必须是我们正在使用的互斥锁!
// a. pthread_cond_wait: 该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起
// b. pthread_cond_wait: 该函数在被唤醒返回的时候,会自动的重新获取你传入的锁
pthread_cond_wait(&_pcond, &_mutex); //因为生产条件不满足,无法生产,此时我们的生产者进行等待
}
// 2. 走到这里一定是没有满
_q.push(in);
// 3. 绝对能保证,阻塞队列里面一定有数据
// 细节3:pthread_cond_signal:这个函数,可以放在临界区内部,也可以放在外部
pthread_cond_signal(&_ccond); // 这里可以有一定的策略
pthread_mutex_unlock(&_mutex);
//pthread_cond_signal(&_ccond); // 这里可以有一定的策略
}
void pop(T* out) // 输出型参数:*, // 输入输出型:&
{
pthread_mutex_lock(&_mutex);
//1. 判断
while (is_empty()) //bug?
{
pthread_cond_wait(&_ccond, &_mutex);
}
// 2. 走到这里我们能保证,一定不为空
*out = _q.front();
_q.pop();
// 3. 绝对能保证,阻塞队列里面,至少有一个空的位置!
pthread_cond_signal(&_pcond); // 这里可以有一定的策略
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
bool is_empty()
{
return _q.empty();
}
bool is_full()
{
return _q.size() == _maxcap;
}
private:
std::queue<T> _q;
int _maxcap; // 队列中元素的上限
pthread_mutex_t _mutex;
pthread_cond_t _pcond; // 生产者对应的条件变量
pthread_cond_t _ccond; // 消费者对应的条件变量
};
- Task.hpp
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
class CalTask
{
using func_t = std::function<int(int, int, char)>;
// typedef std::function<int(int,int)> func_t;
public:
CalTask()
{}
CalTask(int x, int y, char op, func_t func)
:_x(x), _y(y), _op(op), _callback(func)
{}
std::string operator()()
{
int result = _callback(_x, _y, _op);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback;
};
const std::string oper = "+-*/%";
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
// do nothing
break;
}
return result;
}
class SaveTask
{
typedef std::function<void(const std::string&)> func_t;
public:
SaveTask()
{}
SaveTask(const std::string& message, func_t func)
: _message(message), _func(func)
{}
void operator()()
{
_func(_message);
}
private:
std::string _message;
func_t _func;
};
void Save(const std::string& message)
{
const std::string target = "./log.txt";
FILE* fp = fopen(target.c_str(), "a+");
if (!fp)
{
std::cerr << "fopen error" << std::endl;
return;
}
fputs(message.c_str(), fp);
fputs("\n", fp);
fclose(fp);
}
- MainCp.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
//C:计算
//S: 存储
template<class C, class S>
class BlockQueues
{
public:
BlockQueue<C> *c_bq;
BlockQueue<S> *s_bq;
};
void *productor(void *bqs_)
{
BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->c_bq;
// BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(bq_);
while (true)
{
// sleep(3);
// 生产活动,从数据库?从网络,从外设??拿来的用户数据!!
int x = rand() % 100 + 1; // 在这里我们先用随机数,构建一个数据
int y = rand() % 10;
int operCode = rand() % oper.size();
CalTask t(x, y, oper[operCode], mymath);
bq->push(t);
std::cout << "productor thread, 生产计算任务: " << t.toTaskString() << std::endl;
}
return nullptr;
}
void *consumer(void *bqs_)
{
BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->c_bq;
BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;
while (true)
{
// 消费活动
CalTask t;
bq->pop(&t);
std::string result = t(); // 任务非常耗时!!
std::cout << "cal thread,完成计算任务: " << result << " ... done"<< std::endl;
// SaveTask save(result, Save);
// save_bq->push(save);
// std::cout << "cal thread,推送存储任务完成..." << std::endl;
//sleep(1);
}
return nullptr;
}
void *saver(void *bqs_)
{
BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;
while(true)
{
SaveTask t;
save_bq->pop(&t);
t();
std::cout << "save thread,保存任务完成..." << std::endl;
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
BlockQueues<CalTask, SaveTask> bqs;
bqs.c_bq = new BlockQueue<CalTask>();
bqs.s_bq = new BlockQueue<SaveTask>();
pthread_t c[2], p[3], s;
pthread_create(p, nullptr, productor, &bqs);
pthread_create(p+1, nullptr, productor, &bqs);
pthread_create(p+2, nullptr, productor, &bqs);
pthread_create(c, nullptr, consumer, &bqs);
pthread_create(c+1, nullptr, consumer, &bqs);
pthread_create(&s, nullptr, saver, &bqs); //saver - 保存在文件
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(s, nullptr); //
delete bqs.c_bq;
delete bqs.s_bq;
return 0;
}
- makefile
MainCp:MainCp.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f MainCp
- 运行结果
🌻3.探究:生产消费模型高校在哪里
- 首先我们要清楚,生产放入队列、消费拿出队列的动作是原子的。
- 对于生产端,我们构建一个任务可能十分耗时间,构建完成之后,可以竞争式的放进队列。简单来说,就是每个线程之间的任务构造相互独立,不需要一个一个任务串行构造,可以并发式构造,只有放进队列时要一个一个放进去,节省了构造任务的时间。
- 对于消费端,只有将任务取出队列时要一个一个取,而任务(算法等)可以并发式的实现,节省了任务实现的时间。
- 总结:可以在生产之前,和消费之后,让线程并行执行。
🌹🌹 多线3 的知识大概就讲到这里啦,博主后续会继续更新更多C++ 和 Linux的相关知识,干货满满,如果觉得博主写的还不错的话,希望各位小伙伴不要吝啬手中的三连哦!你们的支持是博主坚持创作的动力!💪💪