目录
- 1.线程互斥它是对的吗?合理吗?(任何场景)
- 2.怎么解决饥饿问题?
- 3.条件编译
- 1.生产者和消费者模型
- 2.编写代码实现一个基于堵塞队列的生产者消费者模型
- 4.POSIX信号量
- 5.环形队列
1.线程互斥它是对的吗?合理吗?(任何场景)
线程互斥能防止抢占锁,它是对的,但是它不一定合理,以现实为例,现在有一个图书馆,他有个奇怪的规定,他只能一间自习室有一个人自习,墙上有个锁,你抢到了,去自信30分钟,学不了下去了,出去玩之类的,别人是进不了来的,因为锁在你哪里,到了明天,因为锁在你哪里,你能实现开门,学习了一个小时,学不下去了,出去玩,把钥匙挂墙上,过了一会你又想自习了,因为你离墙进,所以你拿到锁比别人快(等于优先级高),
在上帝视角里面,在你在自习的时候外面一大堆人在等着你抢钥匙去自习,你把钥匙放墙上想了想我不能这么颓废又拉回了钥匙,因为你离钥匙进,优先级高,其他人没有抢到钥匙,那么你又开始学不下去,又把钥匙放墙上,想了想又想继续学,其他人在哪里骂骂咧咧的,但是你不以为然,因为你没错,你遵守了规定,一间自习室只能有一个人,但是他没规定那一个人,你是对的,但是因为这样子,你会导致其他人拿不到图书资源,这样子造成了一种饥饿问题,所以互斥是对的,但是它不合适,会导致饥饿问题:一个执行流,长时间得不到资源
2.怎么解决饥饿问题?
游戏规则和之前一样,一间自习室有一个人自习,但是加上了一个新规定,你出去了,把钥匙挂墙上,你不能马上申请进入自习室,你要开始重新排队排到队列的尾部,这样子就让它具有顺序性
在保证临界资源安全的前提下(互斥等),让线程访问某种资源,具有一定的顺序性,我们称之为同步
保证同步是为了防止饥饿性,也为了实现线程协同,也是用来实现合理性
3.条件编译
1.生产者和消费者模型
商品是超市制造的吗?答案是,不是的,商品是由供应商制造的,超市只是负责售卖东西,那么问题又来了,为什么消费者不直接向供应商直接购买,比如你要买一盒方便面,供应商会给你吗?答案是不会的,因为它觉得太少了,那么供应商为什么也不直接卖给消费者,因为消费者太分散了,超市不一样,因为大家有个具体地点能去超市买东西,比较集中
优点
1.提高效率
2.解耦(供应商下班了,消费者要买东西,超市能直接给你,因为供应商会生产一定的量,给超市存着,所以消费者买东西不会因为供应商的问题买不了东西)
而上面的东西其实就是缓冲区,消费者是消费线程,超市是临界资源,供应商是生产线程
1.消费者有多个,消费者之间是什么关系呢?
竞争关系,因为商品只有1个的时候就是竞争关系了,用计算机来讲就是互斥
2.供应商有多个,供应商之间是什么关系呢?
也是竞争关系,比如我生产一个品牌的商品,那么你肯定不希望超市售卖其他品牌的商品,用计算机来讲就是互斥
3.消费者和供应商之间又是什么关系呢?
有个消费者要买东西,但是供应商这个时候还没生产完,或者说生产完了,还没放进货架里面,那么消费者还不能购买这个东西,你要等供应商把数据给写入了,才能购买,所以它们要保证互斥关系,供应商要么生产完,要么没生产,但是可以看到你要等供应商生产了,消费者才能购买,这不就是会出现,我们前面讲的合理性的问题吗?所以,要保持同步关系
2.编写代码实现一个基于堵塞队列的生产者消费者模型
BlockQueueTest.hpp
#pragma once
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
const uint32_t gDefaultCap = 5;//容量
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap)
: cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&conCond_, nullptr);
pthread_cond_init(&proCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&conCond_);
pthread_cond_destroy(&proCond_);
}
public:
//生产接口
void push(const T &in)
{
// 加锁
// 判断->是否适合生产->bq是否为满->程序员视角的条件->1. 满(不生产) 2. 不满(生产)
// if(满) 不生产,休眠
// else if(不满) 生产,唤醒消费者
// 解锁
lockQueue();
if(isFull()) // ifFull就是我们在临界区中设定的条件
{
proBlockWait(); //阻塞等待,等待被唤醒。
}
// 条件满足,可以生产
pushCore(in); //生产完成
unlockQueue();
wakeupCon(); // 唤醒消费者
}
//消费接口
T pop()
{
// 加锁
// 判断->是否适合消费->bq是否为空->程序员视角的条件->1. 空(不消费) 2. 有(消费)
// if(空) 不消费,休眠
// else if(有) 消费,唤醒生产者
// 解锁
lockQueue();
while (isEmpty())
{
conBlockwait(); //阻塞等待,等待被唤醒
}
// 条件满足,可以消费
T tmp = popCore();
unlockQueue();
wakeupPro(); // 唤醒生产者
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool isEmpty()
{
return bq_.empty();
}
bool isFull()
{
return bq_.size() == cap_;
}
void proBlockWait()
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&proCond_, &mutex_);
}
void conBlockwait() //阻塞等待,等待被唤醒
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&conCond_, &mutex_);
}
void wakeupPro() // 唤醒生产者
{
pthread_cond_signal(&proCond_);
}
void wakeupCon() // 唤醒消费者
{
pthread_cond_signal(&conCond_);
}
void pushCore(const T &in)
{
bq_.push(in); //生产完成
}
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
private:
uint32_t cap_; //容量
queue<T> bq_; // blockqueue
pthread_mutex_t mutex_; //保护阻塞队列的互斥锁
pthread_cond_t conCond_; // 让消费者等待的条件变量
pthread_cond_t proCond_; // 让生产者等待的条件变量
};
BlockQueueTest.cc
#include "BlockQueueTest.hpp"
#include <ctime>
void *consumer(void *args)
{
BlockQueue<int> *bqp = static_cast<BlockQueue<int> *>(args);
while (true)
{
int data = bqp->pop(); // 消费任务
cout << "consumer 消费了一个任务: " << data << endl;
sleep(1);
}
}
void *productor(void *args)
{
BlockQueue<int> *bqp = static_cast<BlockQueue<int> *>(args);
while (true)
{
// 1. 制作数据
int data = rand() % 10;
// 2. 生产数据
bqp->push(data);
cout << "producter 生产了一个任务:" << data << endl;
sleep(1);
}
}
int main()
{
BlockQueue<int> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
运行结果
可以看到是生产一个消费一个,因为我们都是sleep1秒哪怕是他们生产和消费速度不同也没事了,因为我们的类写了判断,如果是满的就不生产,如果是空的就不消费
lockQueue();
if(isFull()) // ifFull就是我们在临界区中设定的条件
{
proBlockWait(); //阻塞等待,等待被唤醒。
}
// 条件满足,可以生产
pushCore(in); //生产完成
unlockQueue();
wakeupCon(); // 唤醒消费者
但是上面代码其实还有点问题,就是wait等待的时候,虽然他是抱着锁等待,但是在堵塞进程的时候他会自动释放锁,但是释放锁的时候就有个问题,他是从哪里被释放锁就从哪里醒来,当我被唤醒的时候我是在临界区醒来的,那么我是没有锁的,所以这个时候就有了安全问题,如果你有很多线程的话,那么就会有线程去竞争锁,如果你只有一把锁,但是你又有很多线程,那么只有一个线程能有锁,其他的线程只能干等被阻塞,而且被唤醒了不一定等于条件被满足,可能是伪唤醒,就是条件不满足,但是他被唤醒了,一样往下执行,虽然概率很小,解决办法也很简单,把if换成while就行了
我们把他升级一下
Task.hpp
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task()
: elemOne_(0), elemTwo_(0), operator_('0')
{
}
Task(int one, int two, char op)
: elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
BlockQueueTest.cc
#include"Task.hpp"
#include "BlockQueueTest.hpp"
#include <ctime>
const std::string ops = "+-*/%";
void *consumer(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t = bqp->pop(); // 消费任务
int result = t(); //处理任务 --- 任务也是要花时间的!
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;
}
}
void *productor(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1. 制作任务
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 2. 生产任务
bqp->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;
sleep(1);
}
}
int main()
{
BlockQueue<int> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
运行结果,确实可以看到,制造了一些任务,并处理掉任务,而且是多线程
可以看到消费者和生产者模型,看到了体现了互斥,但是并不只有互斥,还有并发,但是并发,并不是在临界区并发,而是在生产前(before blockqueue),消费后(after blockqueue)对应的并发
4.POSIX信号量
信号量是一个计数器,描述临界资源数量的计数器
–p = 原子性 = 申请资源
++v = 原子的 = 归还资源
信号量申请成功了就一定保证你会拥有一部分的临界资源,只要信号量申请成功,就申请锁成功。那么你被切走了也不怕,因为只要我拿到锁,临界资源就是我的,所以这是一种资源预定机制,只要我申请锁成功,那么我一定可以获得指定的资源
5.环形队列
环形队列
我们生产线程和消费线程会访问同一个位置吗?
有可能的,我们二个指向同一个位置只有满或者为空的时候(互斥和同步),其他的时候,都指向的是不同的位置(并发)
后续操作的基本原则:
1.空,消费者并不能超过生产者 = 生产者先运行
2.满,生产者不能把消费者套成一个圈继续往后写入 (因为这样会把消费者还没拿到的数据给清理掉) = 消费者先运行 而这些谁能保证信号量来保证
生产者最关心的是空间资源,空间有多少
消费者最关心的是数据资源,数据有多少
代码实现
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gCap = 10;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = gCap): ringqueue_(cap), pIndex_(0), cIndex_(0)
{
// 生产
sem_init(&roomSem_, 0, ringqueue_.size());
// 消费
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pmutex_ ,nullptr);
pthread_mutex_init(&cmutex_ ,nullptr);
}
// 生产
void push(const T &in)
{
sem_wait(&roomSem_); //无法被多次的申请
pthread_mutex_lock(&pmutex_);
ringqueue_[pIndex_] = in; //生产的过程
pIndex_++; // 写入位置后移
pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征
pthread_mutex_unlock(&pmutex_);
sem_post(&dataSem_);
}
// 消费
T pop()
{
sem_wait(&dataSem_);
pthread_mutex_lock(&cmutex_);
T temp = ringqueue_[cIndex_];
cIndex_++;
cIndex_ %= ringqueue_.size();// 更新下标,保证环形特征
pthread_mutex_unlock(&cmutex_);
sem_post(&roomSem_);
return temp;
}
~RingQueue()
{
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
vector<T> ringqueue_; // 唤醒队列
sem_t roomSem_; // 衡量空间计数器,productor
sem_t dataSem_; // 衡量数据计数器,consumer
uint32_t pIndex_; // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源
uint32_t cIndex_; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
};
RingQueueTest.cc
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
void *productor(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while(true)
{
int data = rand()%10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
sleep(1);
}
}
void *consumer(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while(true)
{
//sleep(10);
int data = rqp->pop();
cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
RingQueue<int> rq;
pthread_t c1,c2,c3, p1,p2,p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}