文章目录
- 线程同步
- 条件变量
- 生产者与消费者模型
- 信号量
- 环形队列应用生产者消费者模型
线程同步
现实生活中我们经常会遇到同一个资源多个人都想使用的问题,例如游乐园过山车排队,玩完的游客还想再玩,最好的办法就是玩完的游客想再玩就去重新排队
线程同步其实就是一种等待机制,多个想要同时访问同一个对象的线程形成一个类似等待队列,等待前面的线程使用完毕后,下一个线程再使用。
线程同步的概念:
在保证数据安全的前提下(加锁保护),让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
为什么要线程同步:
互斥是保证线程的安全。但当一个线程访问了临界资源后,释放了它的锁,同时立刻参与到了锁的竞争中,如果它又拿到了锁。那么其他线程就会由于长时间得不到锁访问不了临界资源而造成线程饥饿问题。
同步能让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,使多线程协同高效完成某些事情
同步就是在保证数据安全的前提下,让线程按照某种特定的顺序来访问临界资源。
同步与互斥的关系:互斥是保证数据的安全,同步在互斥的前提下,来提高线程之间的效率。
程序没有安全性的问题,就没有必要使用同步
条件变量
条件变量是类型为pthread_cond_t的变量,是利用线程间共享的全局变量进行同步的一种机制,主要有两个动作:
线程对某个临界资源进行条件判断,为真则执行代码,为假则挂起等待,节省CPU资源避免空等(也可以反着来,主要是挂起等待节省资源)
其他线程在执行某些动作后使条件成立,唤醒等待的线程
它可以用来保证:在某个线程没有满足某种条件完成之前,其他线程只能挂起等待。
条件变量一般用到4个接口:
int pthread_cond_init(pthread_cond_t *cv,const pthread_condattr_t *cattr);
功能:初始化条件变量
cv:要初始化的条件变量
cattr:设置条件变量属性,一般置NULL交给OS默认设置即可
返回值:成功返回0,失败返回错误码
int pthread_cond_destroy(pthread_cond_t *cond)
功能:释放申请的条件变量
返回值:成功返回0,失败返回错误码
注意:条件变量所占的空间没有被销毁(静态区)
int pthread_cond_wait(pthread_cond_t *cv,pthread_mutex_t *mutex);
功能:将调用此函数的线程在指定条件变量中挂起等待
cv:要在哪个条件变量下挂起等待
mutex:线程调用此函数时,会自动释放传入的锁
返回值:成功返回0,失败返回错误码
int pthread_cond_signal(pthread_cond_t *cv);
功能:唤醒等待中的线程
cv:唤醒在哪个条件变量下等待的线程
返回值:成功返回0,失败返回错误码
broadcast是一次唤醒指定条件变量下多个线程
下面用一段代码验证条件变量的用法
代码逻辑:一共申请6个线程,其中一个线程负责发布命令,另外5个线程负责工作
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mtx;
pthread_cond_t cond;
//发布命令线程
void* master(void* args)
{
std::string name = (char*)args;
while(true)
{
//唤醒在条件变量下等待的一个线程
std::cout << "begin run:" << std::endl;
pthread_cond_signal(&cond);
sleep(1);
}
}
//工作线程[5]
void* threadrun(void* args)
{
int num = *(int*)args;
delete (int*)args;
while(true)
{
pthread_cond_wait(&cond,&mtx);
std::cout << "thread[" << num << "]running. . . " << std::endl;
}
}
//每个线程再唤醒执行后经过while循环再次挂起等待
int main()
{
//初始化条件变量
pthread_mutex_init(&mtx,nullptr);
pthread_cond_init(&cond,nullptr);
pthread_t tid[5];
pthread_t boss;
pthread_create(&boss,nullptr,master,(void*)"boss");
for(size_t i = 0;i < 5;i++)
{
int* num = new int(i);//用堆区变量去传递线程号
pthread_create(tid+i,nullptr,threadrun,(void*)num);
}
//最后记得要等待线程以及释放锁和条件变量
for(size_t i = 0;i < 5;i++)
pthread_join(tid[i],nullptr);
pthread_join(boss,nullptr);
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
通过输出结果有如下分析:
-
条件变量内部一定有一个等待队列,哪个线程调用wait,哪个线程就挂起等待并进入等待队列,唤醒顺序就是等待的顺序
(线程执行的顺序是不一定的)
-
wait函数一定是释放锁的,否则线程调用该函数是抱着锁挂起等待的,其他线程就无法访问临界区了
-
signal函数唤醒的线程也一定会去争锁,争到才会继续执行;否则可能会出现一个带锁的线程访问临界区,和一个刚唤醒的线程继续访问临界区等方面的错误
-
在mutex已上锁的时候才能调用wait()
条件变量通常和互斥锁一起使用,互斥是保证线程的安全,条件变量防止互斥造成的饥饿问题
生产者与消费者模型
条件变量使用“通知—唤醒”模型,例如网购商家会发快递,我们只需要等待快递到了给我们发送提示短信;运用在多线程中最经典的就是生产者—消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理, 直接扔给阻塞队列, 消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出
上面的说法放到现实中解释就是,消费者—超市—供货商这样的模型,消费者购买商品不需要找供货商,只需要去超市即可,而供货商也不用管消费者,只需要给超市提供商品。
消费者购买商品的同时,供货商也在生产商品并向超市提供,若供货商出现问题,超市会有存品可以暂时共给消费者,并寻找新的供货商,此时超市就解决了消费者和供货商之间的耦合问题
下面用代码验证一下模型
代码逻辑:
- 设计一个类,对普通队列、条件变量、互斥量进行封装
- Push对应生产者,利用条件变量对其限制,队列为满就挂起等待,待消费者消费数据后唤醒
- Pop对应消费者,利用条件变量对其限制,队列为空就挂起等待,待生产者生产数据后唤醒
/************************BlockQueue.hpp************************/
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
namespace dd
{
template<class T>
class BlockQueue
{
public:
//初始化
BlockQueue()
:_capacity(10)
{
pthread_cond_init(&_empty,nullptr);
pthread_cond_init(&_full,nullptr);
pthread_mutex_init(&_mtx,nullptr);
}
//释放互斥量、条件变量
~BlockQueue()
{
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
pthread_mutex_destroy(&_mtx);
}
//生产逻辑
void Push(const T& key)
{
Lockqueue();
while(full())
ProducterWait();
_bq.push(key);
UnlockQueue();
WakeupConsumer();
}
//消费逻辑
void Pop(T* key)
{
Lockqueue();
while(empty())
ConsumerWit();
*key = _bq.front();
_bq.pop();
UnlockQueue();
WakeupProducter();
}
//队列为空则为真
bool empty()
{
return _bq.empty();
}
//队列为满则为真,这里最大容量设置的是10
bool full()
{
return _bq.size() == _capacity-1; //先判断 后push,所以要-1
}
//加锁
void Lockqueue()
{
pthread_mutex_lock(&_mtx);
}
//解锁
void UnlockQueue()
{
pthread_mutex_unlock(&_mtx);
}
//队列满时生产者挂起等待
void ProducterWait()
{
pthread_cond_wait(&_full,&_mtx);
}
//生产者唤醒等待
void WakeupProducter()
{
pthread_cond_signal(&_full);
}
//队列为空时消费者挂起等待
void ConsumerWit()
{
pthread_cond_wait(&_empty,&_mtx);
}
//消费者唤醒等待
void WakeupConsumer()
{
pthread_cond_signal(&_empty);
}
private:
std::queue<T> _bq;
int _capacity; //最大容量本示例设置的是10
pthread_mutex_t _mtx;
pthread_cond_t _empty;
pthread_cond_t _full;
};
}
/************************************************************************************************/
#include "BlockQueue.hpp"
using namespace dd;
void* consumer(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true)
{
sleep(1);
int data = 0;
bq->Pop(&data);
std::cout << "消费了一个数据:"<< data << std::endl;
}
}
void* producter(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true)
{
int data = rand()%20+1;
std::cout << "生产了一个数据:" << data << std::endl;
bq->Push(data);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<int>* bq = new BlockQueue<int>();
pthread_t c,p;
pthread_create(&c,nullptr,consumer,(void*)bq);
pthread_create(&p,nullptr,producter,(void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
上述示例需要注意:
-
虚假唤醒问题
在wait时,必须把它放到while里,而不是if里,为了防止虚假唤醒
例如上述示例中再增加一个线程,1号线程是生产者,2、3号线程是消费者,当2号线程获取走最后一个数据后,3号线程也想获取,但发现生产队列为NULL于是就挂起等待,随后1号线程生产了一个数据便唤醒3号,但是2号又先把数据获取走了,3号就属于虚假唤醒
3号线程唤醒后获取竞争锁,竞争到了以后继续执行Pop,但是队列已经为NULL了,所以利用while再次判断可预防这种现象,这种现象常见于多核CPU多线程中。
上述示例代码,也可以将发送数据改为发送任务:
- 把发送数据改为发送加减乘除的任务
- 再申请一个类,负责确定任务完成任务的逻辑功能部分,同时也是阻塞队列的数据类型
- 生产者只负责确定要算的数和算法
- 消费者负责完成生产者发布的任务
综上,变更的部分有:新增的类、最后cpp执行部分
/************************BlockQueue.hpp************************/
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <cstdlib>
namespace dd
{
template<class T>
class BlockQueue
{
public:
BlockQueue()
:_capacity(10)
{
pthread_cond_init(&_empty,nullptr);
pthread_cond_init(&_full,nullptr);
pthread_mutex_init(&_mtx,nullptr);
}
~BlockQueue()
{
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
pthread_mutex_destroy(&_mtx);
}
void Push(const T& key)
{
Lockqueue();
while(full())
ProducterWait();
_bq.push(key);
UnlockQueue();
WakeupConsumer();
}
void Pop(T* key)
{
Lockqueue();
while(empty())
ConsumerWit();
*key = _bq.front();
_bq.pop();
UnlockQueue();
WakeupProducter();
}
bool empty()
{
return _bq.empty();
}
bool full()
{
return _bq.size() == _capacity-1; //先判断 后push,所以要-1
}
void Lockqueue()
{
pthread_mutex_lock(&_mtx);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mtx);
}
void ProducterWait()
{
pthread_cond_wait(&_full,&_mtx);
}
void WakeupProducter()
{
pthread_cond_signal(&_full);
}
void ConsumerWit()
{
pthread_cond_wait(&_empty,&_mtx);
}
void WakeupConsumer()
{
pthread_cond_signal(&_empty);
}
private:
std::queue<T> _bq;
int _capacity;
pthread_mutex_t _mtx;
pthread_cond_t _empty;
pthread_cond_t _full;
};
}
/************************Task.hpp************************/
#pragma once
#include <iostream>
#include <pthread.h>
namespace dd
{
class Task
{
public:
Task()
{}
Task(int x,int y,char op)
:_x(x)
,_y(y)
,_op(op)
{}
int Run()
{
int ret = 0;
switch(_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
break;
}
//std::cout << _x << _op << _y << " = " << ret << std::endl;
std::cout << pthread_self() << ": " << _x << _op << _y << " = " << ret << std::endl;
}
private:
int _x;
int _y;
char _op;
};
}
/************************************************************************************************/
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace dd;
void* consumer(void* args)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
while(true)
{
Task t;
bq->Pop(&t);
t.Run();
}
}
void* producter(void* args)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
std::string ops = "+-*/%";
while(true)
{
int x = rand()%20+1;
int y = rand()%20+1;
char op = ops[rand()%5];
Task t(x,y,op);
bq->Push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task>* bq = new BlockQueue<Task>();
pthread_t c1,c2,c3,p;
pthread_create(&c1,nullptr,consumer,(void*)bq);
pthread_create(&p,nullptr,producter,(void*)bq);
pthread_create(&c2,nullptr,consumer,(void*)bq);
pthread_create(&c3,nullptr,consumer,(void*)bq);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(c3,nullptr);
pthread_join(p,nullptr);
return 0;
}
直接使用互斥量,除了生产者、消费者之间要竞争互斥量以外,消费者之间也需要竞争互斥量,但如果汇聚(链表)中没有数据,消费者之间竞争互斥锁是无意义的。有了条件变量机制以后,只有生产者完成生产,才会引起消费者之间的竞争,提高了程序效率。
信号量
信号量的本质是一把计数器,用来描述临界资源中资源数目的大小,达到无冲突的访问共享资源目的
(例如飞机售票,把票看成信号量,买票就是申请信号量,卖票就是释放信号量)
伪代码如下:
临界资源分成5个部分(count=5),count就被称作信号量
count–,一个执行流占有临界资源一部分的操作叫做P操作
count++,一个执行流结束使用临界资源的一部分叫做V操作
count == 0,表示没有资源可以分配,此时的线程或进程就会被挂起等待(内部数据结构会有类似等待队列的结构)
但信号量也属于临界资源,所以V、P操作都是原子性的
- 信号变量的函数接口
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
-
作用:初始化信号量
-
sem:要初始化的信号量,同互斥量、条件变量一样,要创建sem_t类型的 变量
pshared:0表示线程间共享,大于0表示进程间共享value:信号量初始值,信号量个数
-
返回值:成功返回0,失败返回-1,并设置errno来表示错误
#include <semaphore.h>
int sem_destroy(sem_t *sem);
- 作用:销毁定义的信号量
- sem:要销毁的信号量
- 返回值:成功返回0,失败返回-1,并设置errno来表示错误
#include <semaphore.h>
int sem_wait(sem_t *sem);
- 作用:等待信号量,将信号量的值减1,如果信号量为0,阻塞等待。V( )操作
- sem:要等待的信号量
- 返回值:成功返回0,失败返回-1,并设置errno来表示错误
#include <semaphore.h>
int sem_post(sem_t *sem);
- 作用:表示资源使用完毕,将信号量做加1操作。P( )操作
- sem:要发布的信号量
- 返回值:成功返回0,失败返回-1,并设置errno来表示错误
环形队列应用生产者消费者模型
基本原理:
- 生产者和消费者在一开始时是指向同一位置,代表队列为空,应该让消费者等待,生产者工作
- 生产者和消费者当之后所在同一位置,代表队列为满,应该让消费者工作,生产者等待
- 其余时候,生产者和消费者 一定不 指向同一位置
注意事项:
- 在不是同一位置时,生产者必须在消费者前面
- 在同一位置时,为空让生产者先走,为满让消费者先走,但是消费者不可以套圈
- 消费者最关心队列中的数据,因此可以定义一个信号量关心队列已有数据个数
- 生产最关心队列的空位置,因此可以定义一个信号量关心队列的空位置
- 不能让它们同时执行,但是可以并发执行
代码示例:
/**********************************ring_queue.hpp***************************************/
#pragma once
#include <iostream>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#include <vector>
namespace dd
{
template<class T>
class Ring_queue
{
public:
//初始化
Ring_queue()
:_cap(10)
,_c_step(0)
,_p_step(0)
{
sem_init(&_blank_sem,0,10); //位置信号量设置初始值为10
sem_init(&_data_sem,0,0); //数据信号量设置初始值为0
_rq.reserve(10);
}
//释放信号量
~Ring_queue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
}
//生产
void Push(const T& key)
{
//申请数据消费信号量,放入数据,释放位置信号量
sem_wait(&_blank_sem);
_rq[_p_step] = key;
sem_post(&_data_sem);
//更新位置
_p_step++;
_p_step %= _cap;
}
//消费
void Pop(T* key)
{
//申请位置信号量,取出数据,释放数据消费信号量
sem_wait(&_data_sem);
*key = _rq[_c_step];
sem_post(&_blank_sem);
//更新位置
_c_step++;
_c_step %= _cap;
}
private:
int _cap; //总容量
std::vector<T> _rq; //队列
sem_t _blank_sem; //位置信号量
sem_t _data_sem; //数据信号量
int _c_step; //消费者位置(下标)
int _p_step; //生产者位置(下标)
};
}
/*************************************************************************/
#include "ring_queue.hpp"
#include <time.h>
using namespace dd;
void* consumer(void* args)
{
Ring_queue<int>* rq = (Ring_queue<int>*)args;
while(true)
{
int data = rand()%20 + 1;
rq->Push(data);
std::cout << "生产数据:" << data << std::endl;
}
}
void* producter(void* args)
{
Ring_queue<int>* rq = (Ring_queue<int>*)args;
while(true)
{
sleep(1);
int data;
rq->Pop(&data);
std::cout << "消费数据:" << data << std::endl;
}
}
int main()
{
srand((long long)time(nullptr));
pthread_t c,p;
Ring_queue<int>* rq = new Ring_queue<int>();
pthread_create(&c,nullptr,consumer,(void*)rq);
pthread_create(&p,nullptr,producter,(void*)rq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
-
多生产多消费的区别
void Push(const T& key) { sem_wait(&_blank_sem); pthread_mutex_lock(&p_mtx_); _rq[_p_step] = key; _p_step++; _p_step %= _cap; pthread_mutex_unlock(&p_mtx_); sem_post(&_data_sem); } void Pop(T* key) { sem_wait(&_data_sem); pthread_mutex_lock(&c_mtx_); *key = _rq[_c_step]; _c_step++; _c_step %= _cap; pthread_mutex_unlock(&c_mtx_); sem_post(&_blank_sem); }
-
sem_wait是原子性的,但多生产多消费中的队列和下标是临界资源,它们不是原子的,所以需要加锁
-
另外先进行信号量申请相对效率高,因为无论是生产还是消费本质是它们的信号量不为0,而不是先拿到锁
最终,互斥锁加在信号量申请之后,避免争到锁但没有信号量的情况
-