前言
上节我们学习了线程的同步与互斥,学习了互斥锁和条件变量的使用。本章我们将学习编程的一个重要模型,生产者消费者模型,并且运用之前学的线程同步和互斥的相关接口来实现阻塞队列和环形队列,最后再来实现一个简易的线程池。
目录
- 1.生产者消费者模型
- 1.1 基于 阻塞队列(BlockingQueue) 的生产者消费者模型
- 1.2 基于 环形队列 的生产者消费者模型
- 1.2.1 POSIX信号量(可用于线程同步)
- 1.2.2 基于环形队列的生产消费模型
- 1.3 两种实现的区别
- 2 .线程池的实现(懒汉模式)
- 2.1 单例模式复习:
- 2.2 线程池成员变量:
- 2.3 构造和析构函数:
- 2.4 线程池启动:
- 2.5 Pop和Push Task任务:
- 2.6 Task类:
- 2.7 Main.cc的实现:
- 3. STL,智能指针和线程安全
1.生产者消费者模型
生产者消费者模型是同步与互斥的最典型的应用场景。
3 2 1 原则:
- 生产者和生产者(互斥)消费者和消费者(互斥)生产者和消费者( 互斥 / 同步(非常重要)): 3种关系
- 生产者和消费者: 线程承担的2种角色
- 超市:内存中特定的一种内存结构(数据结构): 1个交易场所
1.1 基于 阻塞队列(BlockingQueue) 的生产者消费者模型
设计的这个队列要保证,队列元素如果为满的情况下,就不能让生产者生产了,如果为空的情况下,就不能让消费者来消费了,那么这个的队列就称作为阻塞队列。
- 成员变量:
- 需要一个队列来将对象存入(队列就好比是超市)。
- 我们还需要用于访问控制的互斥锁,在同一时刻只能有一个线程访问队列。
- 我们需要两个用户线程同步的条件变量,因为我们需要在不同的条件下通知的线程(生产者or消费者)。
代码演示:
#pragma once
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
// 默认容量大小
const uint32_t gDefaultCap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap)
: cap_(cap)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&full, nullptr);
pthread_cond_init(&empty, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
private:
uint32_t cap_; // 容量
std::queue<T> bq_; // blockqueue
pthread_mutex_t lock; // 保护阻塞队列的互斥锁
pthread_cond_t full; // 让消费者等待的条件变量
pthread_cond_t empty; // 让生产者等待的条件变量
};
- 插入队列和出队列
void PushData(const int &data)
{
LockQueue();
while(IsFull()){
std::cout << "queue full, notify consume data, product stop." << std::endl;
ProductWait();
}
q.push(data);
//生产完了,就要去唤醒消费者
NotifyConsume();
UnLockQueue();
}
补充解释:
- bq是否为满,程序员视角的条件
(1)满(不生产)(2)不满(生产) - if(满)不生产(不仅仅要不生产),休眠(更要休眠),休眠期间消费线程就可以去申请锁了。
- else if(不满)生产,唤醒消费者。
为什么要用while判断而不用if判断:
- 为了防止有一些误唤醒的条件触发
- 等待条件变量前:当我等待的时候,会自动释放mutex_(因为不能拿着锁去等)
- 使用While条件的话,会重新判断isfull()条件是否满足,重新进入线程等待中去。
出队(pop):
void PopData(int &data)
{
LockQueue();
while(IsEmpty()){
std::cout << "queue empty, notify product data, consume stop." << std::endl;
ConsumeWait();
}
data = q.front();
q.pop();
NotifyProduct();
UnLockQueue();
}
补充解释:
- bq是否为空,程序员视角的条件:
(1)空(不消费)(2)有(消费) - if(空)不消费,休眠。
- else if(有)消费,唤醒生产者。
对其中一些加锁及条件变量等进行封装:
private:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&lock);
}
void ProductWait()
{
pthread_cond_wait(&full, &lock);
}
void ConsumeWait()
{
pthread_cond_wait(&empty, &lock);
}
void NotifyProduct()
{
pthread_cond_signal(&full);
}
void NotifyConsume()
{
pthread_cond_signal(&empty);
}
bool IsEmpty()
{
return ( q.size() == 0 ? true : false );
}
bool IsFull()
{
return ( q.size() == cap ? true : false );
}
- Task任务类
我们可以给队列分配Task对象(任务)
#pragma once
#include <iostream>
#include <string>
using namespace std;
class Task
{
public:
Task() : elemOne_(0), elemTwo_(0), operator_('0')
{
}
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
cout << "div zero, abort" << endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
cout << "非法操作: " << operator_ << endl;
break;
}
return result;
}
// 输出型参数
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
- 生产和消费任务
- 生产任务
void *productor(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1. 制作任务 --- 要不要花时间?? -- 网络,磁盘,用户
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 2. 生产任务
bqp->push(t);
cout << "producter[" << pthread_self() << "] "
<< (unsigned long)time(nullptr) << " 生产了一个任务: "
<< one << op << two << "=?" << endl;
sleep(1);
}
}
- 消费任务
const std::string ops = "+-*/%";
void *consumer(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t = bqp->pop(); // 消费任务
int result = t(); // 处理任务 --- 任务也是要花时间的!
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer[" << pthread_self() << "] "
<< (unsigned long)time(nullptr) << " 消费了一个任务: "
<< one << op << two << "=" << result << endl;
}
}
main任务:
int main()
{
// 生产者用来生产计算任务,消费者用来消费计算任务
BlockQueue<Task> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
补充解释:
- 生产者生产任务的时候和消费者消费任务的时候是并发执行的
- 并发并不是在交易场所中并发。
- 在消费的同时也在制作任务,并发体现就在这里。
1.2 基于 环形队列 的生产者消费者模型
生产消费模型用上了循环队列之后,就会有一个很大的优势:
- 因为生产者和消费者访问的(假设是数组实现的循环队列)是不同下标位置
- 这二者访问的并非同一块内存空间,所以这就实现了同时访问
- 可以实现生产和消费过程并发。。
对比之前的阻塞队列实现:
- 之前学的queue是整体被使用的,没法被切割。
- 这个临界资源可以被划分成不同的区域,要用信号量将这些区域保护起来。
- 要写一个基于固定大小的环形队列, 多线程情况下根本就不用考虑队列为满还是为空,因为信号量帮我们考虑
此时就相当于把循环队列这个临界资源分成了一小块一小块,只有满或空的时候,头指针和尾指针才会指向同一块数组空间,其他时间都是不冲突的!
1.2.1 POSIX信号量(可用于线程同步)
信号量本质上是一个计数器,是一个描述临界资源数量的计数器。
保证不会在限有资源的情况下让多的线程进入到临界区对临界资源的访问。通过信号量来限制进入临界资源当中的线程的个数
- P操作:申请资源(原子的)
- V操作:归还资源(原子的)
持有0和1的信号量叫做,二元信号量
- 初始化一个未命名的信号量:
销毁信号量:
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
1.2.2 基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
成员变量
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#define NUM 16
class RingQueue{
private:
std::vector<int> q; 环形队列
int cap;
sem_t data_sem; // 衡量空间计数器,productor
sem_t space_sem; // 衡量数据计数器,consumer
int consume_step; // 当前生产者写入的位置
int product_step; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
public:
RingQueue(int _cap = NUM):q(_cap),cap(_cap)
{
sem_init(&data_sem, 0, 0);
sem_init(&space_sem, 0, cap);
consume_step = 0;
product_step = 0;
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
~
RingQueue()
{
sem_destroy(&data_sem);
sem_destroy(&space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
}
- 生产和消费函数
void push(const T &in)
{
// 1. 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
// 2. 什么时候用锁,什么时候用sem?你对应的临界资源,是否被整体使用!
P(_space_sem); // P()
Lock(_p_mutex); //? 1
// 一定有对应的空间资源给我!不用做判断,是哪一个呢?
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mutex);
V(_data_sem);
}
// 消费
void pop(T *out)
{
P(_data_sem);
Lock(_c_mutex); //?
*out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);
}
生产者和消费者都为空的时候,一定能保证生产线程先运行,因为一开始消费线程的数据信号量一开始为0,sem_wait(&dataSem_)函数一开始要阻塞等待。
环形队列的使用:(重点)
-
生产者生产时:空间多了一个,申请了一个空间(空间信号量 - 1),数据信号量 + 1。
-
消费者消费时:空间少了一个,释放了一个空间(空间信号量 + 1),数据信号量 - 1。
-
有信号量帮我们做了访问控制,所以我们不需要判断循环队列什么时候为满,什么时候为空
-
Task任务类
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
- 生产和消费处理函数
void *consumerRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
}
}
void *productorRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
}
}
- Main主函数
int main()
{
RingQueue rq;
pthread_t c,p;
pthread_create(&c, NULL, consumer, (void*)&rq);
pthread_create(&p, NULL, producter, (void*)&rq);
pthread_join(c, NULL);
pthread_join(p, NULL);
}
1.3 两种实现的区别
- 基于阻塞队列实现的生产消费模型和环形队列的实现,最大的区别是否让生产和消费的过程并发起来。
2 .线程池的实现(懒汉模式)
线程池是基于阻塞队列实现的。
- 我们只需要把任务交到这个线程的池子里面,其就能帮我们多线程执行任务,计算出结果。
- 当任务来时才创建线程,这个成本有点高,如果提前先把各种池化的东西准备好,等任务来的时候,直接把任务指派给某个线程。
- 无论是进程池还是线程池,本质上都是一种对于执行流的预先分配,当有任务时,直接指定,而不需要创建进程/线程来处理任务
线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着
监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利
用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量
线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技 术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个
Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情 况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误.
2.1 单例模式复习:
在我们之前学过的单例模式分为两种,一种是懒汉模式,一种是饿汉模式 [传送门] 。
- 懒汉:刚开始先不创建对象,等第一次使用的时候再去创建。
缺点:是第一次创建对象需要等待。
优点:是程序启动快。 - 饿汉:在main函数之前就将对象创建出来。
缺点:是程序启动会比较慢。
优点:是启动之后获取对象会比较快。
2.2 线程池成员变量:
#pragma once
#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Log.hpp"
#include "Lock.hpp"
using namespace std;
const static int N = 5;
template <class T>
class ThreadPool
{
private:
int _num; //线程池中线程的个数
bool isStart_; // 表示是否已经启动
std::queue<T> _tasks; // 使用stl的自动扩容的特性 基于阻塞队列进行实现的(里面是放置Task的)
pthread_mutex_t _lock;
pthread_cond_t _cond;
static ThreadPool<T> *instance; //懒汉模式的实例
static pthread_mutex_t instance_lock;//懒汉模式的锁
};
static变量我们需要在类外初始化,模板类型还需要带上template关键字:
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::instance_lock = PTHREAD_MUTEX_INITIALIZER;
2.3 构造和析构函数:
private:
ThreadPool(int num = N) : _num(num), isStart_(false)
{
assert(threadNum_ > 0);
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &tp) = delete;
void operator=(const ThreadPool<T> &tp) = delete;
public:
~ThreadPool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
封装加锁/解锁/通知线程等操作:
private:
void LockQueue() {
pthread_mutex_lock(&_lock);
}
void UnLockQueue() {
pthread_mutex_unlock(&_lock);
}
void WakeUpOne() {
pthread_cond_signal(&_cond);
}
void WakeUpAll() {
pthread_cond_broadcast(&_cond);
}
void ThreadQuit() {
_thread_cur--;
UnLockQueue();
pthread_exit(NULL);
}
void ThreadWait(){
pthread_cond_wait(&_cond, &_lock);
}
bool IsEmpty() {
return _task_queue.empty();
}
因为是懒汉模式的单例,提供一个指针作为单例,不对外开放构造函数。
同时,用delete关键字,禁止拷贝构造和赋值重载
public:
static ThreadPool<T> *getinstance()
{
if(nullptr == instance) // 为什么要这样?提高效率,减少加锁的次数!
{
LockGuard lockguard(&instance_lock);
if (nullptr == instance)
{
instance = new ThreadPool<T>();
instance->start();
}
}
return instance;
}
2.4 线程池启动:
static void threadRoutine(void *args)
{
// pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
T t;//定义的地方
{
tp->LockQueue();
while (tp->isEmpty())
{
tp->threadWait();
}
t = tp->popTask(); // 从公共区域拿到私有区域
}
tp->UnLockQueue();
// for test
// for debug
int one, two;
char oper;
t.get(&one, &two, &oper);
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
// t.run(); // 处理任务,应不应该在临界区中处理?1,0
}
}
void start()
{
// 作为一个线程池,不能被重复启动
assert(!isStart_);
for (int i = 0; i < _num; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
2.5 Pop和Push Task任务:
void PushTask(const T &t) {
LockQueue();
_tasks.push(t);
WakeUpOne();
UnLockQueue();
}
T PopTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
2.6 Task类:
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
2.7 Main.cc的实现:
int main()
{
const string operatorsZZ = "+/*/%";
// unique_ptr<ThreadPool<Task> > tp(new ThreadPool<Task>());// 懒汉模式之后这个就不能用了
unique_ptr<ThreadPool<Task>> tp(ThreadPool<Task>::getInstance());
srand((unsigned long)time(nullptr));
// 派发任务的线程
while (true)
{
int one = rand() % 50;
int two = rand() % 10;
char oper = operatorsZZ [rand() % operatorsZZ .size()];
Log() << "主线程派发计算任务: " << one << oper << two << "=?"
<< "\n";
Task t(one, two, oper);
tp->push(t);
sleep(1);
}
return 0;
}
3. STL,智能指针和线程安全
- STL中的容器是否是线程安全的?
不是. 原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
而且对于不同的容器,加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用,往往需要调用者自行保证线程安全.
- 智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.(因为不能拷贝和赋值,只能自己用)
对于 shared_ptr,多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这 个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数
尾声
看到这里,相信大家对这个Linux有了解了。
如果你感觉这篇博客对你有帮助,不要忘了一键三连哦