除了线程互斥,我们还有线程同步,来康康吧
文章目录
- 1.为什么需要同步
- 2.生产消费模型
- 2.1 生产者和消费者的关系
- 2.2 以简单代码为例
- 2.3 并发
- 3.条件变量接口
- 3.1 init/destroy
- 3.2 pthread_cond_wait
- 3.3 pthrea_cond_signal/broadcast
- 3.4 代码示例
- 3.4.1 小bug
- 3.4.2 修正
- 3.4.3 典型错误
- 4.阻塞队列-生产消费模型实例
- 4.1 成员变量
- 4.2 push和pop
- 4.2.1 运行测试
- 4.2.2 进一步封装
- 4.2.3 使用task分配运算任务
- 5.阻塞队列-循环队列
- 5.0 为啥用循环队列
- 5.1 POSIX信号量
- 5.1.1 sem_init/destroy
- 5.1.2 sem_wait
- 5.1.3 sem_post
- 5.1.4 在循环队列里面使用
- 5.2 成员变量
- 5.3 构造/析构
- 5.4 生产消费
- 5.5 测试
- 结语
1.为什么需要同步
在部分条件下,互斥是正确的,但是不合理。比如食堂打饭的时候,食堂阿姨一次只能给一个人打饭,于是就选择通过竞争来获得打饭的权利。此时就会出现有些线程因为优先级过低或者CPU调度问题,一直打不到饭,于是就出现了饥饿问题。
这是因为我们对多线程访问同一个资源没有进行限制,全靠CPU调度来决定运行顺序;所以我们需要对线程进行一定的控制,这就是线程同步的概念
- 饥饿问题:某一个线程一直无法申请到某种资源(比如因为优先级过低)
- 同步:保证数据安全(临界资源访问)的前提下,让线程根据一定条件和顺序访问临界资源,从而避免饥饿问题
- 竞态条件:因为时序问题(CPU调度)而导致程序异常;
2.生产消费模型
这个模型其实很简单,消费者去超市购买东西,生产者把商品投放到超市中。
- 这时候就不需要消费者直接去找工厂问他xx东西又没有生产,他需要购买;而是转去超市里面购买xx东西;
- 如果xx东西没有货了,超市就会通知生产者进行补货。如果超市里面的货架已经满了,就通知生产者不需要继续生产了;
- 当商品没货了,超市会告知消费者这个东西没货,消费者会停止消费行为;而生产者补货了之后,超市就会通知消费者让他来购买
在基础模式中,消费者要想知道一个东西有没有货,需要去超市里面看(相当于轮循检测)
我们可以引入一个通知方式,比如超市开放一个微信公众号,告知消费者xx物品是否有货,以及告知消费者什么时候需要补货,此时就不需要消费者和生产者不断询问超市关于一个商品的情况了!这就相当于线程同步
!
2.1 生产者和消费者的关系
下面提到的是普适情况
- 消费者有多个,他们之间是竞争关系(互斥)竞争商品的购买
- 生产者有多个,他们之间是竞争关系(互斥)竞争超市的货架
- 消费者和生产者之间,既有互斥关系,也是同步关系(需要生产者供货了之后,消费者才能消费)这两个关系并不冲突!
除了上面提到的3种生产关系,还有下面俩点
在实际程序中,消费者和生产者都是由线程承担的(2种角色)
超市是内存中特定的一个数据结构,也是临界资源(1个交易场所)
我们可以用321原则来快速记住这几条,这样就记住了生产消费模型!👍
2.2 以简单代码为例
在旧模式中,main函数调用另外一个函数,想获得返回值,需要等这个函数运行完毕;好比消费者购买东西,需要去找厂家并等待厂家生产……
而在生产消费者模型中,main作为主线程,只需要把待处理的数据丢进缓冲区;而线程B从缓冲区中取出数据,处理完毕后放回缓冲区。main可以先执行其他代码,过一会再过来拿线程B处理好的结果。
这就实现了生产和消费的解耦!
2.3 并发
生产消费模型的并发,更多的体现在消费者在处理任务的同时,生产者可以生产任务;
线程切换的成本低于进程,由此便提高了数据处理的效率
接下来我们就要解决下面这些问题😁
1.如何让多个消费者线程等待呢?又如何让消费者线程被唤醒呢?
2.如何让多个生产者线程等待呢?又如何让生产者线程被唤醒呢?
3.如何衡量消费者和生产者所关心的条件是否就绪呢?
而前面提到的通知方式,在linux系统中,就是条件变量
了!
3.条件变量接口
3.1 init/destroy
基本的接口和pthread库的其他接口很相似,都是一样的用法;其中attr
也是设置条件变量的属性,这里置为nullptr即可
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
3.2 pthread_cond_wait
这两个接口都是让线程在一个条件变量下进行等待,其中timewait
接口可以设置等待的时间(超时了就不等了)
条件变量也是临界资源,所以这里需要一个mutex锁来保证条件变量读写的原子性
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
3.3 pthrea_cond_signal/broadcast
这个接口的作用是给在条件变量下等待的线程发信号
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
其中broadcast
是给在当前条件变量等待的所有线程发信号,而signal
是发送信号,只唤醒一个线程;
如果调用成功,这两个函数都会返回0;否则返回错误码
RETURN VALUE
If successful, the pthread_cond_broadcast() and pthread_cond_signal() functions shall return zero; otherwise, an error number shall
be returned to indicate the error.
3.4 代码示例
下面这个代码可以很好的演示上面提到的多个接口
#include<iostream>
#include<string.h>
#include<signal.h>
#include<pthread.h>
#include<thread>
#include<unistd.h>
#include<sys/types.h>
#include<sys/syscall.h>
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//锁
pthread_cond_t cond;//条件变量
volatile bool quit = false;
void*func(void* arg)
{
while(!quit)//这里有bug,后续会提到
{
pthread_cond_wait(&cond,&mutex);
cout << "thread is running... " << (char*)arg << endl;
}
cout << "thread quit... " << (char*)arg << endl;
}
int main()
{
pthread_cond_init(&cond,nullptr);
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,func,(void*)"t1");
pthread_create(&t2,nullptr,func,(void*)"t2");
pthread_create(&t3,nullptr,func,(void*)"t3");
char c;
while(1)
{
cout << "[a/b]$ ";
cin >> c;
if(c=='a')
{
pthread_cond_signal(&cond);
}
else if(c=='b')
{
pthread_cond_broadcast(&cond);
}
else
{
quit = true;
break;
}
usleep(500);
}
cout << "main break: " << quit << endl;
sleep(1);
pthread_cond_broadcast(&cond);
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
return 0;
}
每次输入a,就signal
会唤醒一个线程;每次输入b,会调用broadcast
,唤醒当前所有线程
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ a
thread is running... t1
[a/b]$ a
thread is running... t2
[a/b]$ a
thread is running... t3
[a/b]$ b
thread is running... t1
thread is running... t2
thread is running... t3
3.4.1 小bug
上面的代码示例,会出现下面的问题,即我们输入除了a和b以外的所有字符,都应该会把全局变量quit
改成true,让三个线程都退出
但观察到的现象却是只有一个线程退出了,其他线程阻塞等待了
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ q
main break: 1
thread is running... t3
thread quit... t3
这是因为pthread_cond_wait
里面进行了独特的操作,即等待之前,它会释放锁,等待之后,他会重新申请锁
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex)
{
pthread_mutex_unlock(mutex);//先解锁
//避免因为该线程拿着锁去休眠了,导致其他线程无法申请该锁
//条件变量相关代码
pthread_mutex_lock(mutex);//条件满足后,再加锁
}
第一个退出的线程,退出之前申请了锁却没有释放,于是就导致其他线程在条件满足后,没有办法申请锁,只能阻塞等待!
3.4.2 修正
修正的方法很简单,我们只需要在while(!quit)
循环的退出条件满足之后,释放一下锁,就OK了!
void*func(void* arg)
{
while(!quit)
{
pthread_cond_wait(&cond,&mutex);
cout << "thread is running... " << (char*)arg << endl;
}
pthread_mutex_unlock(&mutex);//正确操作:需要在条件满足后,解锁
cout << "thread quit... " << (char*)arg << endl;
}
此时就能看到,所有线程都正常退出了!
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ q
main break: 1
thread is running... t1
thread quit... t1
thread is running... t2
thread quit... t2
thread is running... t3
thread quit... t3
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$
3.4.3 典型错误
根据这点,就能引出一个比较典型的错误
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
pthread_cond_wait(&cond);//解锁和加锁的操作,该函数会帮我们完成
pthread_mutex_lock(&mutex);//二次申请同一把锁,出现死锁!
}
pthread_mutex_unlock(&mutex);
两次申请同一把锁,就好比自己把自己绊倒了😂我们要避免写出这样的错误代码!
4.阻塞队列-生产消费模型实例
这个队列的作用,就是提供一个超市,供生产者和消费者进行数据的交换
- 生产者,往队列里面push数据
- 消费者,从队列里面pop数据
看起来有些类似于管道,同样的,生产者和消费者在读取阻塞队列的时候,不仅需要保证自己的操作是原子操作,还需要做到一定的访问控制
;即消费者在队列空的时候不能继续pop,生产者在队列满的时候不能继续push
此时,我们还可以引入一个微信公众号
,也就是一定的通知方式:不要让生产者、消费者疯狂检测阻塞队列,而是引入条件变量,在队列不为空的时候,通知消费者;在队列不为满的时候,通知生产者;这样就达到了线程之间的同步。
4.1 成员变量
要实现阻塞队列,我们首先需要理清楚需要什么成员变量,来保护该队列
- 用于访问控制的锁,同一时刻只能有一个线程访问队列
- 用户线程同步的条件变量,因为我们需要在不同的条件下通知不同的人,所以需要2个条件变量
- 一个队列,为了方便,采用
std::queue
,这样就不用自己造轮子了
理清楚了之后,就可以来写成员变量啦;我采用了模板类型,这样阻塞队列就可以用来存放任何我们想要的类型了
template<class T>
class BlockQueue
{
private:
queue<T> _bq;//队列
size_t _size;//大小
pthread_mutex_t _mutex;//锁
pthread_cond_t _proInf;//通知生产者
pthread_cond_t _conInf;//通知消费者
public:
BlockQueue(int sz=5)
:_size(sz)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_proInf,nullptr);
pthread_cond_init(&_conInf,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_proInf);
pthread_cond_destroy(&_conInf);
}
};
你可能想问,queue不是有封装size吗?哪这里我们还定义一个大小变量,会不会有些多余?
nope!实际上,这里的这个_size
就好比我们在C语言写顺序表的时候,成员capacity
;其作用是来判断我们的队列有没有满的。
4.2 push和pop
对于一个队列,最重要的操作就是在队头出数据,队尾入数据
简单说来,就是需要在处理队列数据的时候进行加锁,保证原子性;
除此以外,生产者和消费者有不同的操作逻辑:
- 生产
- 判断是否符合生产条件(队列没有满)
- 满,不生产;不满,生产;
- 满了的时候,生产者应在条件变量中等待(等待消费者消费)
- 不满的时候,生产者生产,并通知消费者来消费
- 消费
- 判断是否满足消费条件(队列不为空)
- 空,不消费;不空,消费;
- 空了的时候,消费者应该在条件变量中等待(等待生产者生产)
- 不空的时候,消费者消费,并通知生产者继续生产
这样就实现了阻塞队列push和pop的基本逻辑;由此可以写出下面的代码
//消费者消费
T pop()
{
//加锁
pthread_mutex_lock(&_mutex);
//判断条件
if(_size)//空,不消费
{
pthread_cond_wait(&_conInf,&_mutex);
}
//消费并通知生产者
T tmp = _bq.front();
_bq.pop();
pthread_cond_signal(&_proInf);
//解锁
pthread_mutex_unlock(&_mutex);
return tmp;
}
//生产者生产
void Push(const T& in)
{
//加锁
pthread_mutex_lock(&_mutex);
//判断条件
if(bq.size()>=_size)//满,不生产
{
pthread_cond_wait(&_proInf,&_mutex);
}
//生产并通知消费者
_bq.push(in);
pthread_cond_signal(&_conInf);
//解锁
pthread_mutex_unlock(&_mutex);
}
4.2.1 运行测试
有了这个基本框架,我们就可以来测试一下代码啦!
先来一个生产者和消费者康康吧
#include "blockqueue.hpp"
#include <time.h>
#include <stdlib.h>
void *consume(void *args)
{
BlockQueue<int> *bqp = (BlockQueue<int> *)args;
while(1)
{
// 消费
int ret = bqp->pop();
cout << "consume " << pthread_self() << " 消费:" << ret << endl;
sleep(1);
}
}
void *produce(void *args)
{
BlockQueue<int> *bqp = (BlockQueue<int> *)args;
while (1)
{
// 制作
cout << "########↓" << endl;
int a = rand()%100;
// 投放到超市
bqp->push(a);
cout << "produce " << pthread_self() << " 生产:" << a << endl;
sleep(2);
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t t1,t2;
BlockQueue<int> bq(5);
pthread_create(&t1,nullptr,produce,(void*)&bq);
pthread_create(&t2,nullptr,consume,(void*)&bq);
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
return 0;
}
可以看到,刚开始消费者并没有运行,而是等待生产者生产出数据了之后,再开始消费!我们的目的成功达成!
[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
########↓
produce 139716365768448 生产:41
########↓
produce 139716365768448 生产:11
consume 139716357375744 消费:41
########↓
produce 139716365768448 生产:19
consume 139716357375744 消费:11
########↓
produce 139716365768448 生产:19
consume 139716357375744 消费:19
########↓
produce 139716365768448 生产:25
consume 139716357375744 消费:19
########↓
produce 139716365768448 生产:63
consume 139716357375744 消费:25
如果增加线程到2生产2消费,则会看到下面的情况
[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
########↓
produce 140483816179456 生产:51
consume 140483807786752 消费:51
########↓
produce 140483799394048 生产:38
consume 140483791001344 消费:38
########↓
produce 140483816179456 生产:37
consume 140483807786752 消费:37
每次被唤醒的生产者和消费者都是不一样的,交替唤醒
4.2.2 进一步封装
为了代码的可读性,我们可以对阻塞队列进一步封装
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;
template<class T>
class BlockQueue
{
private:
queue<T> _bq;//队列
size_t _size;//大小
pthread_mutex_t _mutex;//锁
pthread_cond_t _proInf;//通知生产者
pthread_cond_t _conInf;//通知消费者
public:
BlockQueue(int sz=5)
:_size(sz)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_proInf,nullptr);
pthread_cond_init(&_conInf,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_proInf);
pthread_cond_destroy(&_conInf);
}
//消费者消费
T pop()
{
//加锁
lock();
//判断条件
if(isEmpty())//空,不消费
{
ConWait();
}
//消费并通知生产者
T tmp = _bq.front();
_bq.pop();
WakeUpPro();
//解锁
unlock();
return tmp;
}
//生产者生产
void push(const T& in)
{
//加锁
lock();
//判断条件
if(isFull())//满,不生产
{
ProWait();
}
//生产并通知消费者
_bq.push(in);
WakeUpCon();
//解锁
unlock();
}
private:
void lock()
{
pthread_mutex_lock(&_mutex);
}
void unlock()
{
pthread_mutex_unlock(&_mutex);
}
//唤醒消费者
void WakeUpCon()
{
pthread_cond_signal(&_conInf);
}
//唤醒生产者
void WakeUpPro()
{
pthread_cond_signal(&_proInf);
}
//消费者等待
void ConWait()
{
pthread_cond_wait(&_conInf,&_mutex);
}
//生产者等待
void ProWait()
{
pthread_cond_wait(&_proInf,&_mutex);
}
//判断条件
bool isFull()
{
return _size == _bq.size();
}
bool isEmpty()
{
return _bq.empty();
}
};
4.2.3 使用task分配运算任务
因为阻塞队列是用模板类型的,我们可以自己实现一个仿函数
,来给生产者消费者分配任务
#pragma once
#include <iostream>
using namespace std;
class Task
{
public:
Task(int one=0, int two=0, char op="+")
: _elem1(one), _elem2(two), _operator(op)
{}
// 仿函数
int operator() ()
{
return run();
}
// 运行仿函数
int run()
{
int result = 0;
switch (_operator)
{
case '+':
result = _elem1 + _elem2;
break;
case '-':
result = _elem1 - _elem2;
break;
case '*':
result = _elem1 * _elem2;
break;
case '/':
{
if (_elem2 == 0)
{
cout << "div zero, abort" << endl;
result = -1;
}
else
{
result = _elem1 / _elem2;
}
break;
}
case '%':
{
if (_elem2 == 0)
{
cout << "mod zero, abort" << endl;
result = -1;
}
else
{
result = _elem1 % _elem2;
}
break;
}
default:
cout << "unknown: " << _operator << endl;
break;
}
return result;
}
// 获取元素,方便打印
void get(int *e1, int *e2, char *op)
{
*e1 = _elem1;
*e2 = _elem2;
*op = _operator;
}
private:
int _elem1;
int _elem2;
char _operator;
};
测试一下
#include "blockqueue.hpp"
#include "task.hpp"
#include <time.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
string ops ="+-*/%";
void *consumer(void *args)
{
BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
while (1)
{
Task t = bqp->pop();
int result = t(); //处理任务
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer [" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << " = " << result << endl;
}
}
void *producer(void *args)
{
BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
while (1)
{
// 制作任务
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 投放给消费者生产
bqp->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << " = ?" << endl;
sleep(1);
}
}
void test2()
{
pthread_t t1,t2;
BlockQueue<Task> bq(5);
pthread_create(&t1,nullptr,producer,(void*)&bq);
sleep(1);
pthread_create(&t2,nullptr,consumer,(void*)&bq);
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
}
int main()
{
srand((unsigned long)time(nullptr));//乘一个数字添加随机性
test2();
return 0;
}
运行,可以看到生产者生产了问题之后,消费者会去解答。此时我们只需要在线程中取回运算好的结果,就OK了!
[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
producter[140703422265088] 1673319321 生产了一个任务: 43-16 = ?
consumer [140703413872384] 1673319322 消费了一个任务: 43-16 = 27
producter[140703422265088] 1673319322 生产了一个任务: 45/12 = ?
consumer [140703413872384] 1673319322 消费了一个任务: 45/12 = 3
producter[140703422265088] 1673319323 生产了一个任务: 17/7 = ?
consumer [140703413872384] 1673319323 消费了一个任务: 17/7 = 2
producter[140703422265088] 1673319324 生产了一个任务: 49-14 = ?
consumer [140703413872384] 1673319324 消费了一个任务: 49-14 = 35
producter[140703422265088] 1673319325 生产了一个任务: 4%4 = ?
consumer [140703413872384] 1673319325 消费了一个任务: 4%4 = 0
5.阻塞队列-循环队列
5.0 为啥用循环队列
上面的队列是封装了queue,下面我们要利用环形队列的方式来实现一个功能相同的环形队列。
所谓循环队列,就是在一个一维数组中,通过头尾两个指针,来标识队列的头尾。如果数据数量超出空间末尾,则在空间的开头放入,并移动尾指针
这部分可以看我的博客 循环队列
用上循环队列,就有一个显著的优势:因为我们访问的(假设是数组)是不同下标位置,其并非同一块内存空间,所以是可以同时访问的!这样就进一步显现了生产消费的并发属性
这就相当于把循环队列这个临界资源分成了一小块一小块;只有满/空的时候,头尾指针会指向同一块空间,其余时间都是不冲突的!
注意:这需要程序猿来保证,获取了信号量之后,访问的肯定是临界资源中的不同区域,否则是会出问题的!
5.1 POSIX信号量
这里又要重新认识一下信号量了,在先前的博客中,简单提到了信号量的概念,其本质上是一个计数器。
- p操作:申请资源
- v操作:归还资源
对信号量的操作是原子的,不会因为线程切换而发生错误和冲突。
循环队列需要有两个标识符,来标识当前数据的头尾。注意,信号量可不能做下标使用,这里我们可以用int
类型,加锁来解决原子性问题;
信号量在环形队列中的作用,还是用于标识 空间剩余/数据数量
因为这是一个阻塞队列:
- 生产者放入数据,对应的是空间信号量,只有有空间的时候,才能往环形队列里面放入
- 消费者取出数据,对应的是数据信号量,没有数据也就不能取了
此时,先前介绍的semop
函数在此环节不太适合,在此介绍两个来自pthread
库的新接口;这些接口在编译的时候都需要带上-lpthread
5.1.1 sem_init/destroy
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
因为都是pthread库的,其使用方法还是很相似的,我们需要对信号量进行初始化,并给定一个value作为信号量的初始值;
对init
函数的pshared
参数做一定讲解,在man手册中的介绍是这样的
如果我们的信号量需要在线程中共享,那就将该参数设置为0;
如果信号量需要在进程中共享,其就应该处在共享内存区域,可以对添加了共享内存的进程之间共享;
阻塞队列是给线程使用的,所以我们设置成0就可以了。
5.1.2 sem_wait
这个接口和锁/条件变量的wait是一样的,其作用是申请一个信号量,如果信号量为0,则在此处等待,直到信号量非0
NAME
sem_wait, sem_timedwait, sem_trywait - lock a semaphore
SYNOPSIS
#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
Link with -pthread.
可以简单理解为,这个接口的作用就是让信号量-1
trywait
则是非阻塞检测;timewait
是设置等待的时间,超时就不等了。这点和条件变量的接口是一样的
5.1.3 sem_post
这个接口的作用就是释放一个信号量,相当于给这个信号量+1
NAME
sem_post - unlock a semaphore
SYNOPSIS
#include <semaphore.h>
int sem_post(sem_t *sem);
5.1.4 在循环队列里面使用
知道了这几个接口,对于我们环形队列的使用就比较明了了
- 当生产者开始生产的时候,申请一个空间信号量(剩余空间-1)生产完毕后,释放一个数量信号量(剩余数量+1)
- 当消费者开始消费的时候,申请一个数量信号量(剩余数量-1)生产完毕后,释放一个空间信号量(剩余空间+1)
除了这两个信号量,生产消费的时候,还需要操作头尾指针,指向队列正确的位置
5.2 成员变量
前面铺垫了那么多,现在就可以来试试水了!
vector<T> _rq;//队列
sem_t _spaceSem;//空间信号量
sem_t _valueSem;//数据信号量
pthread_mutex_t _proMutex;//生产者的锁
pthread_mutex_t _conMutex;//消费者的锁
size_t _rear;//尾指针
size_t _front;//头指针
因为信号量就可以充当条件变量的角色,所以这里就不需要条件变量来通知生产者/消费者了
5.3 构造/析构
接下来要做的,就是写构造/析构了
RingQueue(int capa = 5)
:_rq(capa),
_rear(0),
_front(0)
{
sem_init(&_spaceSem,0,capa);//空间
sem_init(&_valueSem,0,0);//数量为0
pthread_mutex_init(&_proMutex,nullptr);
pthread_mutex_init(&_conMutex,nullptr);
}
~RingQueue()
{
sem_destroy(&_spaceSem);
sem_destroy(&_valueSem);
pthread_mutex_destroy(&_proMutex);
pthread_mutex_destroy(&_conMutex);
}
5.4 生产消费
首先我们需要知道,当队列满/空的时候,分别对应啥情况
- 满:生产太多了,生产者得休息
- 空:消费太多了,消费者得缓缓
因为有信号量帮我们做了访问控制,所以我们不需要判断循环队列什么时候为满,什么时候为空:
- 满的时候:数量=队列容量,空间信号量=0,无法申请空间,无法生产
- 此时生产者会在空间信号量里面等待,不会继续生产;消费者继续消费
- 空的时候:空间=队列容量,数量信号量=0,没有可以消费的
- 此时消费者会在数量信号量里面等待,不会继续消费;生产者继续生产
这也是信号量作为访问控制的一大特征,当你申请成功了,就代表你肯定有临界资源的访问权限了;再加上我们给访问临界区加了锁,自然也不会出现被其他线程抢了的情况🎉
//生产者
void Push(T& in)
{
sem_wait(&_spaceSem);//获取空间
pthread_mutex_lock(&_proMutex);
_rq[_rear] = in; //生产
_rear++; // 写入位置后移
_rear %= _rq.size(); // 更新下标,避免越界
pthread_mutex_unlock(&_proMutex);
sem_post(&_valueSem);//释放数量
}
//消费
T pop()
{
sem_wait(&_valueSem);
pthread_mutex_lock(&_conMutex);
T tmp = _rq[_front];
_front++;
_front %= _rq.size();// 更新下标,保证环形特征
pthread_mutex_unlock(&_conMutex);
sem_post(&_spaceSem);
return tmp;
}
5.5 测试
string ops ="+-*/%";
void *consumer(void *args)
{
RingQueue<Task> *bqp = (RingQueue<Task> *)args;
while (1)
{
Task t = bqp->pop();
int result = t(); //处理任务
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer [" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << " = " << result << endl;
}
}
void *producer(void *args)
{
RingQueue<Task> *bqp = (RingQueue<Task> *)args;
while (1)
{
// 制作任务
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 投放给消费者生产
bqp->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << " = ?" << endl;
sleep(1);
}
}
void test2()
{
pthread_t t1,t2;
RingQueue<Task> bq(5);
pthread_create(&t1,nullptr,producer,(void*)&bq);
sleep(1);
pthread_create(&t2,nullptr,consumer,(void*)&bq);
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
}
int main()
{
srand((unsigned long)time(nullptr));//乘一个数字添加随机性
test2();
return 0;
}
刚开始运行的时候,发现了这个错误
这是因为在task中,我们重写了构造函数,编译器就不会生成默认构造了
Task(int one, int two, char op)
: _elem1(one), _elem2(two), _operator(op)
{}
而创建环形队列的时候,会自动调用构造函数。此时发现没有匹配的构造函数(无参),就会报错!
解决办法是,添加上一个无参构造,直接使用default
关键字即可
Task() = default;//使用系统默认生成的无参构造
运行,可以看到生产消费稳定跑起来了!
[muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test
producter[139758380586752] 1673936332 生产了一个任务: 24-8 = ?
producter[139758380586752] 1673936333 生产了一个任务: 34/19 = ?
consumer [139758372194048] 1673936333 消费了一个任务: 24-8 = 16
consumer [139758372194048] 1673936333 消费了一个任务: 34/19 = 1
producter[139758380586752] 1673936334 生产了一个任务: 18*16 = ?
consumer [139758372194048] 1673936334 消费了一个任务: 18*16 = 288
producter[139758380586752] 1673936335 生产了一个任务: 48+15 = ?
consumer [139758372194048] 1673936335 消费了一个任务: 48+15 = 63
增多线程,也能正常运行!
[muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test
producter[140344921630464] 1673936659 生产了一个任务: 44-15 = ?
consumer [140344913237760] 1673936660 消费了一个任务: 44-15 = 29
producter[140344921630464] 1673936660 生产了一个任务: 33+14 = ?
consumer [140344913237760] 1673936660 消费了一个任务: 33+14 = 47
producter[140344904845056] 1673936661 生产了一个任务: 14/12 = ?
consumer [140344896452352] 1673936661 消费了一个任务: 14/12 = 1
producter[140344921630464] 1673936661 生产了一个任务: 24-16 = ?
consumer [140344913237760] 1673936661 消费了一个任务: 24-16 = 8
producter[140344904845056] 1673936662 生产了一个任务: 3-3 = ?
consumer [140344896452352] 1673936662 消费了一个任务: 3-3 = 0
producter[140344921630464] 1673936662 生产了一个任务: 36*7 = ?
consumer [140344913237760] 1673936662 消费了一个任务: 36*7 = 252
producter[140344904845056] 1673936663 生产了一个任务: 28+8 = ?
consumer [140344896452352] 1673936663 消费了一个任务: 28+8 = 36
producter[140344921630464] 1673936663 生产了一个任务: 26-5 = ?
consumer [140344913237760] 1673936663 消费了一个任务: 26-5 = 21
结语
关于线程同步的知识点,大概就是这些了。博客写的满满当当,在理解了接口的基本命名和使用逻辑后,感觉就没有那么难了
加油哦!