POSIX信号量
- 📖1. 信号量的定义
- 📖2. 二值信号量
- 📖3. 用信号量作为条件变量
- 📖4. 基于环形队列的生产者消费者模型
我们知道,需要锁和条件变量来解决各种相关的,有趣的并发问题, Dijkstra及其同事发明了信号量,作为与同步有关的所有工作的唯一原语,可以使用信号量作为锁和条件变量.
📖1. 信号量的定义
信号量是一个有整数值的对象,可以用两个函数来操作它在POSIX
标准中,是sem_wait()
和sem_post()
. 因为信号量的初始值能够决定其行为,所以首先要初始化信号量,才能调用函数与之交互.
#include<semaphore.h>
sem_t s;
sem_init(&s, 0, 1);
我们定义了一个信号量,通过第三个参数,将它的值初始化为1.
sem_init()
的第二个参数,设置为0,表示信号量是在同一进程的多个线程共享的.可以参考man手册,了解信号量的其他用法(如何用于跨不同进程的同步访问).
信号量初始化之后,我们可以调用sem_wait()
和sem_post()
与之交互:
int sem_wait(sem_t* s)
{
//将信号量的值减1, 如果信号量s的值为负, 则等待
}
int sem_post(sem_t* s)
{
//将信号量s的值加1, 如果有一个或多个线程在等待, 唤醒一个线程
}
📖2. 二值信号量
现在我们开始使用信号量,我们可以用信号量作为锁,直接把临界区用一对sem_wait()/sem_post()
环绕:
int main()
{
sem_t m;
sem_init(&m, 0, 1);
sem_wait(&m);
//临界区
sem_post(&m);
return 0;
}
接下来,我们来分析分析,它是如何实现锁的功能的:
我们可以使用信号量来实现锁,因为锁只有两个状态(持有或没持有),所以这种用法有时也叫作二值信号量.
📖3. 用信号量作为条件变量
信号量也可以用在一个线程暂停执行,等待某一条件成立的场景. 在这种场景下,通常一个线程等待条件成立,另外一个线程修改条件并发信号给等待线程,从而唤醒等待线程,因为等待线程在等待某些条件发生变化,所以我们将信号量作为条件变量.
下面是一个简单例子,假设一个线程创建另一个线程,并且等待它结束:
sem_t s;
void* child(void* args)
{
cout << "child" << endl;
sem_post(&s);
return nullptr;
}
int main()
{
sem_init(&s, 0, 0);
cout << "parent: begin" << endl;
pthread_t t;
pthread_create(&t, nullptr, child, nullptr);
sem_wait(&s);
cout << "parent: end" << endl;
return 0;
}
由于两个线程执行时的时序问题,所以我们需要分析两种情况:
📖4. 基于环形队列的生产者消费者模型
代码实现如下:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gCap = 10;
template<class T>
class RingQueue
{
public:
RingQueue(int cap = gCap) : ringQueue_(cap), pIndex_(0), cIndex_(0)
{
//生产
sem_init(&roomSem_, 0, ringQueue_.size());
//消费
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pmutex_, nullptr);
pthread_mutex_init(&cmutex_, nullptr);
}
//生产
void push(const T& in)
{
sem_wait(&roomSem_);
pthread_mutex_lock(&pmutex_);
ringQueue_[pIndex_] = in;
pIndex_++;
pIndex_ %= ringQueue_.size();
pthread_mutex_unlock(&pmutex_);
sem_post(&dataSem_);
}
//消费
T pop()
{
sem_wait(&dataSem_);
pthread_mutex_lock(&cmutex_);
T temp = ringQueue_[cIndex_];
cIndex_++;
cIndex_ %= ringQueue_.size();
pthread_mutex_unlock(&cmutex_);
sem_post(&roomSem_);
return temp;
}
~RingQueue()
{
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
vector<T> ringQueue_; //环形队列
sem_t roomSem_;
sem_t dataSem_;
uint32_t pIndex_; //当前生产者写入的位置
uint32_t cIndex_; //当前消费者读取的位置
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
};
测试代码:
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
#include <pthread.h>
void* productor(void* args)
{
RingQueue<int>* rqp = static_cast<RingQueue<int>* >(args);
while(true)
{
int data = rand() % 10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
sleep(1);
}
}
void* consumer(void* args)
{
RingQueue<int>* rqp = static_cast<RingQueue<int>* >(args);
while(true)
{
int data = rqp->pop();
cout << "pthread[" << pthread_self() <<"]" << " 消费了一个数据: " << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
RingQueue<int> rq;
pthread_t c1, c2, c3;
pthread_t p1, p2, p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
测试结果如下: