前面我们已经提过线程互斥的相关概念,但是我们在前文的抢票逻辑中,我们其实很容易发现一个问题。那就是票可能被一直被一个人抢,这里我们就需要引入条件变量的概念。
目录
1、条件变量
<1>线程同步
<2>相关概念
<3>相关的接口
2、生产消费模型
<1>概念的引入
<2>基于条件变量的生产消费模型的示例代码
<3>多生产多消费模型
<4>小结
3、基于环形队列的生产消费模型
<1>概念引入
<2>具体介绍
<3>具体实现方法 - 信号量
<4>基于信号量的环形队列生产消费模型的代码示例
1、条件变量
<1>线程同步
我们先用一个例子引入条件变量。假设我们现在在学校里有一台冰柜,冰柜上有一把锁,每次冰柜只能被一个人使用。现在有一名同学放了一大杯饮料在冰柜中,其他同学看到冰柜被使用,就只能等第一名同学用完后,把锁交出来。但是这里就可能会出现这种问题,每当第一名用锁把冰柜打开后,把里面的冷饮喝了一口又放回去锁上了,这就导致其他同学压根拿不到锁,无法使用冰柜。学校的管理者,看到这种现象肯定是不允许的,所以就规定了每次使用完冰柜后,需要将钥匙交出,并且到队列尾部重新排队等待锁。这样就能使得每个人都有机会获得冰柜的使用权。
上述例子中的同学我们其实就可以看成是线程,而冰柜就是一个临界资源。上述例子中,修改后冰柜的使用方法,基本做到让不同线程访问同一个临界资源安全的情况下,让不同的线程访问临界资源具有顺序性,我们称这为线程同步。
<2>相关概念
如何实现线程的同步,这里我们就需要是用条件变量了。这里再用一个例子来理解一下条件变量是什么东西。
假设先在D向一个箱子内放置苹果,箱子有锁,A、B、C都会在箱子内取东西。现在箱子内没有苹果,需要D去放置苹果。但是D的抢锁能力很差,根本就拿不到锁,而A、B、C抢锁能力很强。这就会导致C一直抢锁,检查箱子内有没有苹果(箱子内没有苹果),而D无法持有锁去放苹果。为了避免这种情况,我们就可以让A、B、C先拿锁,如果打开箱子后没有苹果(有就直接拿),就到一旁的队列排队等待,D放完苹果后,D(用铃铛)通知了再去拿锁开箱,取出苹果。(如下图)
这里的"铃铛+队列"其实就是条件变量。
<3>相关的接口
这里三个接口的使用和互斥锁的使用一样,只不过需要定义的类型变成了pthread_cond_t而已。
pthread_cond_wait其实就和上图中队列对应,表示在该条件变量下等待。pthread_cond_timedwait和pthread_cond_wait的区别就是pthread_cond_timedwait在第三个参数处设置了过期时间,超过该时间后,线程会自动被唤醒,这个接口我们一般不使用。
这两个接口就类似于上面的铃铛,用于唤醒队列中的线程。pthread_cond_signal 表示唤醒一个线程,pthread_cond_broadcast是唤醒cond下等待的所有线程。
上述的所有接口成功都返回零,失败返回错误码
下面简单地示范一下如何使用条件变量
#include <iostream>
#include <vector>
#include <pthread.h>
#include <unistd.h>
#include <string>
#define Num 4
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
bool stop_threads = false;
void *Slaverwork(void *arg)
{
while (true)
{
pthread_mutex_lock(&g_mutex);
while (!stop_threads)
{
pthread_cond_wait(&g_cond, &g_mutex);
}
pthread_mutex_unlock(&g_mutex);
break; // Exit the thread
}
std::string threadname = static_cast<const char *>(arg);
std::cout << "线程被唤醒, 唤醒名称为: " << threadname << std::endl;
delete[] (char *)arg;
return nullptr;
}
void *Create_Slaver(void *arg)
{
std::vector<pthread_t> *tids = static_cast<std::vector<pthread_t> *>(arg);
for (int i = 0; i < Num; i++)
{
pthread_t tid;
char *ptr = new char[64];
snprintf(ptr, 64, "pthread - %d", i + 1);
pthread_create(&tid, nullptr, Slaverwork, ptr);
tids->push_back(tid);
}
return nullptr;
}
void MasterStart(std::vector<pthread_t> &tids)
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, Create_Slaver, &tids);
if (n == 0)
{
std::cout << "Master create success" << std::endl;
}
tids.push_back(tid);
}
void Master_awake(std::vector<pthread_t> &tids)
{
int count = 6;
std::cout << "Master begin work" << std::endl;
while (count--)
{
sleep(1);
pthread_mutex_lock(&g_mutex);
pthread_cond_signal(&g_cond);
pthread_mutex_unlock(&g_mutex);
}
pthread_mutex_lock(&g_mutex);
stop_threads = true;
pthread_cond_broadcast(&g_cond);
pthread_mutex_unlock(&g_mutex);
std::cout << "Master work done " << std::endl;
}
void MasterJoin(std::vector<pthread_t> &tids)
{
for (auto &tid : tids)
{
pthread_join(tid, nullptr);
std::cout << "join success" << std::endl;
}
}
int main()
{
std::vector<pthread_t> tid;
MasterStart(tid);
Master_awake(tid);
MasterJoin(tid);
return 0;
}
上述代码简单地演示了条件变量地使用方式,需要注意的是,条件变量一定要在加锁后进行等待。
2、生产消费模型
为了更好地理解条件变量地具体作用,这里介绍一下生产消费模型。
<1>概念的引入
其实我们这个生产消费模型在日常生活中其实挺常见,超市就是典型的生产消费模型。
上图的厂商和消费者我们均可以看成是线程,而超市就是临界资源。这里的商品就是数据,超市实际就是用来存数据品的一段内存空间(数据结构)。这里多个消费者和厂商之间关系,就涉及了多线程的同步和互斥问题。在这个模型中存在着三种关系:生产者和生产者、消费者和消费者(厂商就是生产者)、消费者和生产者、。
<1>生产者和生产者 - 互斥(+同步)
生产者之间的关系必然是互斥的关系。举个生活中的例子,当超市在进货时,不可能会让不同的厂商在同一时间,在同一货架上摆货,这样可能会造成货架上的货物混乱。所以生产者之间一定得是互斥关系。为了避免一个商家一直摆货,我们也可以加上同步关系。
<2>消费者和消费者 - 互斥 (+同步)
消费者之间的关系必然是互斥的。举个生活中的例子,假设现在超市里只有一瓶水,此时有两个想喝水的人同时进入了超市,在这种竞争条件下,两个人都无法获得水。为了避免一个人买完水后,又去拿水结账,一直重复该动作,导致其他人压根无法结账购买货物。我们可以加上同步的关系。
<3>消费者和生产者 - 互斥 + 同步
消费者和生产者之间必须同时拥有这两种关系。这其实很好理解,厂商在上架时,消费者不能去取货物,否则可能会导致货物录入超市系统时出现数目对不上的情况。而厂商不能一直摆货,消费者也不能一直消费货物,这两者的行为必须是同步的。
总结一下,这个超市本质上就是商品的一个缓冲地带。生产消费模型能够为我们提供更好的并发度,将生产和消费的过程进行解耦合。
<2>基于条件变量的生产消费模型的示例代码
在正式编写代码前,我们需要知道的是,了解一下阻塞队列的概念。阻塞队列是一种特殊的队列,当队列为空时,获取队列元素的操作将会被阻塞。当队列满了时,向队列中存放数据的操作也会被阻塞。(这里的阻塞队列就对应着超市)为了实现这种操作,我们就需要应用到条件变量。
<1>单生产单消费模型
ThreadMode.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T)>;
// typedef std::function<void(const T&)> func_t;
template<typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func, T data, const std::string name="none-name")//右值
: _func(func), _data(data), _threadname(name), _stop(true)
{}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
T& Data()
{
return _data;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T _data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
}
#endif
BlockQueue.hpp
#include <iostream>
#include <vector>
#include <queue>
template <typename T>
class blockqueue
{
typedef int data;
private:
bool isfull()
{
return self.size() == capacity;
}
public:
blockqueue(int cap = 20) : capacity(cap)
{
pthread_mutex_init(&_glock, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consume_cond, nullptr);
}
void Enqueue(const T& in)
{
// 1.加锁
pthread_mutex_lock(&_glock);
while(isfull())
{
_product_wait_num++;
pthread_cond_wait(&_product_cond, &_glock);//存在伪唤醒情况(消费一个数据,却唤醒了多个生产线程),可以将外部判断语句变为while
_product_wait_num--;
}
self.push(std::move(in));
//1.1唤醒消费线程
if(_consume_wait_num > 0)//有消费者在等,才需要唤醒
pthread_cond_signal(&_consume_cond);
// 2.解锁
pthread_mutex_unlock(&_glock);
}
void Popqueue(T* out)
{
// 1.加锁
pthread_mutex_lock(&_glock);
while(self.empty())//避免多个线程在被唤醒时,只有一个线程持有锁,其他线程在锁下等待失败,继续向下执行时,出现问题。
{
_consume_wait_num++;
pthread_cond_wait(&_consume_cond, &_glock);//1.让线程进入休眠,当被唤醒后,需要重新持有锁后方能从该位置继续执行。
_consume_wait_num--;
}
*out = self.front();
self.pop();
//1.1唤醒生产的线程
if(_product_wait_num > 0)//有生产者在等,才需要唤醒
pthread_cond_signal(&_product_cond);
// 2.解锁
pthread_mutex_unlock(&_glock);
}
~blockqueue()
{
pthread_mutex_destroy(&_glock);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consume_cond);
}
private:
int capacity; // 容量
std::queue<T> self;
pthread_mutex_t _glock; // 互斥锁
pthread_cond_t _product_cond; // 生产者条件变量
pthread_cond_t _consume_cond; // 消费者条件变量
int _product_wait_num;//等待线程数
int _consume_wait_num;
};
Main.cc
#include <iostream>
#include <string>
#include <vector>
#include "ThreadMode.hpp"
#include "BlockQueue.hpp"
#include "unistd.h"
using namespace ThreadModule;
typedef int data;
using Blockqueue_t = blockqueue<data>*;
pthread_mutex_t _data_lock = PTHREAD_MUTEX_INITIALIZER;
data cnt = 100;
void Product(Blockqueue_t ptr)
{
while (1)
{
sleep(1);
pthread_mutex_lock(&_data_lock);
ptr->Enqueue(cnt);
std::cout << "product data is :" << cnt << std::endl;
cnt--;
pthread_mutex_unlock(&_data_lock);
}
}
void Consume(Blockqueue_t ptr)
{
while (1)
{
sleep(2);
data sum;
ptr->Popqueue(&sum);
std::cout << "comsume data is : " << sum << std::endl;
}
}
void StartComm(std::vector<Thread<Blockqueue_t>> &thread, int num, Blockqueue_t ptr, func_t<Blockqueue_t> fun)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread - " + std::to_string(i + 1);
thread.emplace_back(fun, ptr, name);
thread.back().Start();
}
}
void StartProducter(std::vector<Thread<Blockqueue_t>> &thread, int num, Blockqueue_t ptr)
{
StartComm(thread, num, ptr, Product);
}
void StartConsumer(std::vector<Thread<Blockqueue_t>> &thread, int num, Blockqueue_t ptr)
{
StartComm(thread, num, ptr, Consume);
}
void Wait(std::vector<Thread<Blockqueue_t>> &threads)
{
for (auto &e : threads)
{
e.Join();
}
}
int main()
{
blockqueue<data> *ptr = new blockqueue<data>;
std::vector<Thread<Blockqueue_t>> threads;
StartProducter(threads, 1, ptr);//第二个参数确定生产者数量
StartConsumer(threads, 1, ptr);//第二个参数确定消费者数量
Wait(threads);
return 0;
}
运行结果
<3>多生产多消费模型
在实际场景下,单生产单消费的模型其实并不多见,绝大部分是多生产多消费模型。多生产多消费模型的代码其实和上面单消费单生产模型代码一致,我们只需要修改main函数中的参数即可。这里我在阻塞队列中存放的均是int类型的数据,正常情况下,这里面存放的是一个一个的任务。由消费者执行任务,生产者获取任务,这里就不演示具体做法了。
<4>小结
这个模型的优点在于能够提供较好的并发度,虽然临界资源只能由一个线程进行访问,但是拿到任务以后,不同的线程可以并发的处理任务。
3、基于环形队列的生产消费模型
<1>概念引入
上面我们介绍了普通的生产消费模型,在上一个模型中。我们其实可以发现,对于队列的使用(临界资源)其实是不怎么高效的,因为我把队列看成一个整体对数据进行保存。前面我们提到过信号量的概念,该篇文章中,我们提到过临界资源可以被划分称多块使用,也可以被整体使用。既然这里使用的是将临界资源看成整体的方法,那么也应该可以将该资源看成多块进行使用。下面介绍一下基于环形队列的生产消费模型。
<2>具体介绍
环形队列本质上是一个数组(也可以是其他数据结构),这里不对它的具体实现作描述,简单叙述一下原理即可。环形队列将整体资源分成多块,不用像上面一样进行整体访问,可以做到生产和消费并发执行。
在环形队列中,我们需要使用两个指针,一个指向消费者下标,一个指向生产者下标。在这两个下标指向同一个位置时,可能会存在两种情况:
1、环形队列为满
在这种情况下,我们就要确保消费者先走,不能让生产者先动,否则会造成数据覆盖的现象。
2、环形队列为空
在这种情况下,我们需要确保生产者先走,只有生产者走了,消费者才有东西可以消费。
除开两个下标在同一个位置的情况下,其余情况都是两个指正指向不同的位置。在这种情况下,我们就可以让生产者和消费者并发执行。不过需要注意的是,生产者一定不能超过消费者一圈,并且消费者不能超过生产者,否则会出现问题。
<3>具体实现方法 - 信号量
为了实现上述功能,我们需要引入信号量。前面我们提到信号量就是用来描述临界资源多少的计数器,当信号量大于零时,我们申请临界资源就肯定可以成功,如果等于零,就肯定无法申请成功。这个特性可以让我们省去很多对临界资源的判断。
下面我使用伪代码来简单描述一下环形队列的具体实现过程。
对于生产者来说,空间是比较重要的,而对于消费者来说,数据是比较重要的,而空间和数据都属于资源。所以这里我们需要申请两个信号量,一个用于描述空间资源,一个用于描述数据资源。对于生产者来说,我们首先就要申请空间信号量,由于我们将空间信号量设置成了大于零的初始值,而数据信号量设置成了等于零的初始值。所以在这种情况下,生产者肯定比消费者先走,而消费者只能阻塞在P操作中。在生产者执行完相关代码后,会对数据信号量做V操作,此时消费者申请数据信号量成功,开始向下执行。当执行完所有代码后,会对空间信号量做V操作(这个过程其实就是生产者生产完后,提醒消费者,消费者消费完后提醒生产者,这个过程形成完美闭环)。当两个信号量都不等于0或10时,消费者和生产者就可以同步运行。
<4>基于信号量的环形队列生产消费模型的代码示例
下面对引用上一个模型的代码,简单写一个基于信号量的环形队列生产消费模型。在此之前,我们需要介绍几个信号量的相关接口,方便大家理解
1、sem_init
首先我们需要定义一个sem_t变量,将该变量地址用sem_init进行初始。pshared为零表示,信号量只在线程之间进行共享,如果大于零表示能在进程之间共享。第三个参数表示信号量需要设定的值。
2、sem_destroy
当我们不需要使用信号量时,就可以使用sem_destroy进行销毁。成功返回0,错误返回-1,错误码被设置。
3、sem_wait
sem_wait用于减少信号量,当信号量减少到小于等于零,线程会阻塞。上图第二个接口在出现信号量小于等于零时,不会阻塞,会直接出错返回,第三个接口则是在特定时间内返回。这里我们一般就使用第一个接口,模拟P操作。成功返回0,失败返回-1.
4、sem_post
sem_post用于增加信号量,用于模拟V操作。成功返回0,失败返回-1.
需要注意的是,以上的所有接口,均需链接pthread原生线程库
示例代码:
ThreadMode.hpp(该文件较上文有些许修改)
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T,std::string name)>;
// typedef std::function<void(const T&)> func_t;
template<typename T>
class Thread
{
public:
void Excute()
{
_func(_data,_threadname);
}
public:
Thread(func_t<T> func, T data, const std::string name="none-name")//右值
: _func(func), _data(data), _threadname(name), _stop(true)
{}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread<T> *self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
T& Data()
{
return _data;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T _data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
}
#endif
RingQueue.hpp
#include <vector>
#include <iostream>
#include <string>
#include <semaphore.h>
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &lock)
{
pthread_mutex_lock(&lock);
}
void Unlock(pthread_mutex_t &lock)
{
pthread_mutex_unlock(&lock);
}
public:
RingQueue(int cap = 10) : _cap(cap)//初始值默认设置成10
{
pthread_mutex_init(&_productor_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr);
sem_init(&room, 0, 10);
sem_init(&data, 0, 0);
_product_index = 0;
_consume_index = 0;
queue.resize(cap);
}
void Enqueue(const T &date)
{
//申请信号量
P(room);//由于PV操作本身就是原子的,所以这里是不用加锁
//加锁,保护临界资源
Lock(_productor_mutex);
queue[_product_index++] = date;
_product_index %= _cap;
Unlock(_productor_mutex);
V(data);
}
void Popqueue(T *date)
{
P(data);
Lock(_consumer_mutex);
*date = queue[_consume_index];
queue[_consume_index++] = T();
(_consume_index) %= _cap;
Unlock(_consumer_mutex);
V(room);
}
~RingQueue()
{
sem_destroy(&room);
sem_destroy(&data);
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
int _cap;
std::vector<T> queue;
// 空间信号量
sem_t room;
sem_t data;
// 空间信号量下标
int _product_index;
int _consume_index;
// 多线程需要加锁
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
Main.cc(较上个示例代码有一定改动)
#include <iostream>
#include <string>
#include <vector>
#include "ThreadMode.hpp"
#include "unistd.h"
#include "RingQueue.hpp"
using namespace ThreadModule;
typedef int data;
using Ringqueue_t = RingQueue<data>*;
pthread_mutex_t _data_lock = PTHREAD_MUTEX_INITIALIZER;
data cnt = 100;
void Product(Ringqueue_t ptr,std::string name)
{
while (1)
{
sleep(1);
pthread_mutex_lock(&_data_lock);
ptr->Enqueue(cnt);
std::cout << "product data is :" << cnt << "---[" << name << "]" << std::endl;
cnt--;
pthread_mutex_unlock(&_data_lock);
}
}
void Consume(Ringqueue_t ptr,std::string name)
{
while (1)
{
sleep(2);
data sum;
ptr->Popqueue(&sum);
std::cout << "comsume data is : " << sum << "---[" << name << "]" << std::endl;
}
}
void StartComm(std::vector<Thread<Ringqueue_t>> &thread, int num, Ringqueue_t ptr, func_t<Ringqueue_t> fun,const std::string cname)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1) + " + " + cname;
thread.emplace_back(fun, ptr, name);
}
}
void StartProducter(std::vector<Thread<Ringqueue_t>> &thread, int num, Ringqueue_t ptr)
{
StartComm(thread, num, ptr, Product, "Productor");
}
void StartConsumer(std::vector<Thread<Ringqueue_t>> &thread, int num, Ringqueue_t ptr)
{
StartComm(thread, num, ptr, Consume,"Consumer");
}
void Wait(std::vector<Thread<Ringqueue_t>> &threads)
{
for (auto &e : threads)
{
e.Join();
}
}
void StartAll(std::vector<Thread<Ringqueue_t>>& thread)
{
for(auto& e : thread)
{
e.Start();
}
}
int main()
{
RingQueue<data> *ptr = new RingQueue<data>;
std::vector<Thread<Ringqueue_t>> threads;
StartProducter(threads, 4, ptr);
StartConsumer(threads, 5, ptr);
StartAll(threads);
Wait(threads);
return 0;
}
运行结果:
以上就是所有内容,文中如有不对之处,还望各位大佬指正,谢谢!!!