文章目录
- 📖 前言
- 1. 生产消费模型
- 2. 阻塞队列
- 2.1 成员变量:
- 2.2 入队(push)和出队(pop):
- 2.3 封装与测试运行:
- 2.3 - 1 对代码进一步封装
- 2.3 - 2 分配运算任务
- 2.3 - 3 测试与运行
- 3. 循环阻塞队列
- 3.1 POSIX信号量:
- 3.1 - 1 sem_init / sem_destroy
- 3.1 - 2 sem_wait
- 3.1 - 3 sem_post
- 3.2 成员变量:
- 3.3 生产消费:
- 3.4 构造与析构:
- 3.5 测试:
- 4. 线程池(懒汉模式)
- 4.1 单例模式复习:
- 4.2 成员变量:
- 4.3 构造和析构:
- 4.4 两次nullptr判断:
- 4.5 线程池启动:
- 4.6 封装加锁/解锁/通知线程等操作:
- 4.7 测试:
📖 前言
上一章节我们学习了线程的同步与互斥,学习了互斥锁和条件变量的使用。本章我们将学习编程的一个重要模型,生产者消费者模型,并且运用之前学的线程同步和互斥的相关接口来实现阻塞队列和环形队列,最后再来实现一个简易的线程池。
目标已经确定,准备开讲啦……
1. 生产消费模型
生产者消费者模型是同步与互斥的最典型的应用场景:(重新认识条件变量)
- 消费者在消费期间,不妨碍工厂去生产,工厂在生产期间不影响消费者消费。
- 因为超市的存在,消费者和工厂间不再是强耦合的关系,而是一种解耦的关系。
1.消费者有多个,消费者之间是什么关系呢?
- 竞争关系 —— 互斥
2.供应商有多个,供应商之间是什么关系呢?
- 竞争关系 —— 互斥
3.消费者和供应商之间又是什么关系呢?
- 互斥关系,同步关系
除了要保证临界资源的安全性之外,还要保证生产消费过程中的合理性。
- 如果只有互斥的情况,那么生产者、消费者都要来轮询检测。
- 通过互斥的方式,效率太低了,不合理。
- 生产和消费应该要有一定的顺序,消费完了再生产,生产满了再消费。
3 2 1 原则:
- 生产者和生产者(互斥)消费者和消费者(互斥)生产者和消费者(互斥 / 同步):
3种关系
- 生产者和消费者:
线程承担的2种角色
- 超市:内存中特定的一种内存结构(数据结构):
1个交易场所
2. 阻塞队列
基于生产者和消费者模型的阻塞队列。
设计的这个队列要保证,队列元素如果为满的情况下,就不能让生产者生产了,如果为空的情况下,就不能让消费者来消费了,那么这个的队列就称作为阻塞队列。
生产接口:
- 纯互斥的话,先进行加锁,再判断队列满了没。
- 如果满了就不生产,然后解锁,之后退出。
- 只是接口调用完成了,但是这个线程下次会又跑过来了,就又先加锁,再判断满不满足生产,如果满了就不生产,然后解锁,之后退出。
- 因为优先级比较高,导致了就在这里重复,不断地申请锁,释放锁,导致消费者申请不到锁。
这就是纯互斥,生产者一直在抢占锁,而导致消费线程的饥饿。同样的道理,消费线程也是如此。
这种场景没错,但是不合理:
- 我们是需要有一个条件变量的方式,让双方能够进行在特定条件不满足的时候,进入不生产并且还休眠的状态。
- 同样的让消费者在消费的时候,不满足消费条件时,也进行休眠, 让双方彼此唤醒对方。
这种就叫做同步式的阻塞队列。
2.1 成员变量:
既然是阻塞队列,再结合线程互斥与同步来维护该队列:
- 首先我们需要一个队列来将对象存入(队列就好比是超市)。
- 我们还需要用于访问控制的互斥锁,在同一时刻只能有一个线程访问队列。
- 我们需要两个用户线程同步的条件变量,因为我们需要在不同的条件下通知的线程(生产者or消费者)。
代码演示:
#pragma once
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
// 默认容量大小
const uint32_t gDefaultCap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap)
: cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&conCond_, nullptr);
pthread_cond_init(&proCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&conCond_);
pthread_cond_destroy(&proCond_);
}
private:
uint32_t cap_; // 容量
queue<T> bq_; // blockqueue
pthread_mutex_t mutex_; // 保护阻塞队列的互斥锁
pthread_cond_t conCond_; // 让消费者等待的条件变量
pthread_cond_t proCond_; // 让生产者等待的条件变量
};
我们这里用的是C++的类模板,阻塞队列里的内容就可以相对灵活一些了。
2.2 入队(push)和出队(pop):
阻塞队列类内函数:
入队(push):
// 生产接口
void push(const T &in) // const &: 纯输入
{
// 先把队列锁住
lockQueue();
while (isFull()) // ifFull就是我们在临界区中设定的条件
{
proBlockWait();
}
// 条件满足,可以生产
pushCore(in); // 生产完成
// wakeupCon(); // 唤醒消费者
// 把队列解锁
unlockQueue();
wakeupCon(); // 生产完了,生产者就要唤醒消费者
}
生产之前要判断判断,是否适合生产:
- bq是否为满,程序员视角的条件
-
- 满(不生产)
-
- 不满(生产)
if(满)
不生产(不仅仅要不生产),休眠(更要休眠),休眠期间消费线程就可以去申请锁了。else if(不满)
生产,唤醒消费者。
为什么要用while
判断而不用if
判断:
- 等待条件变量前:当我等待的时候,会自动释放
mutex_
(因为不能拿着锁去等)。 - 阻塞等待,等待被唤醒。
- 被唤醒 != 条件被满足 (概率虽然很小),要做到:被唤醒 && 条件被满足
因为一些原因导致了被伪唤醒了:
-
- 有可能是系统的问题,也有可能是代码本身有问题。
-
- 但是要保证代码的健壮性,继续向后执行。
- 当我醒来的时候,我是在临界区里醒来的!!
-
- 当线程被伪唤醒后,它会重新参与调度并尝试获取锁。
-
- 如果其他线程已经持有了锁,并且没有释放,那么伪唤醒的线程将无法获得锁,它需要继续等待或者重新检查条件是否满足。
- 从哪里被阻塞,就要从哪里醒来,醒来之后就是相当于没有锁就访问临界资源了。
先解锁还是先唤醒,以生产者为例:
当消费者在解锁之前被唤醒时:
-
- 生产完成,把消费者唤醒了,然后生产者被切走并且没有释放锁。
-
- 消费者会在条件变量里被唤醒,然后去争锁,但是争不到,要等待(因为生产者的锁没解锁)。
-
- 这次等待不在条件变量下去等了,而是在申请互斥锁上等。
-
- 一旦生产者切回来,解锁以后,消费者直接会竞争锁成功。
当消费者在解锁之后被唤醒时:
-
- 一旦解锁了,唤醒消费者,消费者就会立马能够从
pthread_cond_wait
里返回并且把锁重新持有,接下来进行后续操作,进行消费。
- 一旦解锁了,唤醒消费者,消费者就会立马能够从
-
- 如果当刚解锁时,还没有唤醒消费者。
-
- 那么此时其他消费者可能把锁拿走了,该消费者线程竞争锁失败了(在申请锁当中去等了)。
-
- 其他消费者忙自己的事情,会自己释放锁的。
出队(pop):
// 消费接口
T pop()
{
// 先把队列锁住
lockQueue();
while (isEmpty())
{
conBlockwait(); // 阻塞等待,等待被唤醒,?
}
// 条件满足,可以消费
T tmp = popCore();
// 把队列解锁
unlockQueue();
wakeupPro(); // 消费完了,消费者就要唤醒生产者
return tmp;
}
消费之前要判断是否适合消费:
- bq是否为空,程序员视角的条件:
-
- 空(不消费)
-
- 有(消费)
if(空)
不消费,休眠。else if(有)
消费,唤醒生产者。
消费接口唤醒生产者和解锁顺序同上生产者操作。
2.3 封装与测试运行:
2.3 - 1 对代码进一步封装
为了代码的可读性,也是为了以后能够修改方便,我们对加锁,条件变量等进行了封装:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool isEmpty()
{
return bq_.empty();
}
bool isFull()
{
return bq_.size() == cap_;
}
// 生产者进行阻塞等待
void proBlockWait() // 生产者一定是在临界区中的!
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&proCond_, &mutex_);
}
// 消费者进行阻塞等待
void conBlockwait() // 阻塞等待,等待被唤醒
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&conCond_, &mutex_);
// 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回
}
// 唤醒生产者
void wakeupPro()
{
// 一定要在生产者所在的条件变量下唤醒
pthread_cond_signal(&proCond_);
}
// 唤醒消费者
void wakeupCon()
{
// 一定要在消费者所在的条件变量下唤醒
pthread_cond_signal(&conCond_);
}
// 生产完成
void pushCore(const T &in)
{
bq_.push(in);
}
// 消费
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
2.3 - 2 分配运算任务
因为阻塞队列我们实现的时候是用了类模版,所以我们可以给队列分配Task对象(任务)
#pragma once
#include <iostream>
#include <string>
using namespace std;
class Task
{
public:
Task() : elemOne_(0), elemTwo_(0), operator_('0')
{
}
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
cout << "div zero, abort" << endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
cout << "非法操作: " << operator_ << endl;
break;
}
return result;
}
// 输出型参数
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
2.3 - 3 测试与运行
生产者生产任务并放入到阻塞队列当中:
void *productor(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1. 制作任务 --- 要不要花时间?? -- 网络,磁盘,用户
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 2. 生产任务
bqp->push(t);
cout << "producter[" << pthread_self() << "] "
<< (unsigned long)time(nullptr) << " 生产了一个任务: "
<< one << op << two << "=?" << endl;
sleep(1);
}
}
消费者从队列里拿任务,并执行任务:
const std::string ops = "+-*/%";
void *consumer(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t = bqp->pop(); // 消费任务
int result = t(); // 处理任务 --- 任务也是要花时间的!
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer[" << pthread_self() << "] "
<< (unsigned long)time(nullptr) << " 消费了一个任务: "
<< one << op << two << "=" << result << endl;
}
}
int main()
{
// 生产者用来生产计算任务,消费者用来消费计算任务
BlockQueue<Task> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
于是就实现了,生产一个任务消费一个任务:
生产者生产任务的时候和消费者消费任务的时候是并发
执行的:(重点)
- 并发并不是在交易场所中并发。
- 并不是在临界区中并发(一般而言),而是生产前(before blockqueue),消费后(after blockqueue)对应的并发。
- 在消费的同时也在制作任务,并发体现就在这里。
消费者必须按照生产的节奏来走,和管道一样,写得慢那么读的也慢(得有访问控制,互斥同步机制)。
解耦体现在生产者生产的任务,可以通过阻塞队列派发给消费者。
生产和消费的速度不一致,如何理解?
- 并不仅仅指的是,在阻塞或者环形队列,里忙闲不均速度不均。
- 更重要指的是,生产者生产制作一个任务,和消费者把一个任务全都处理完。
- 这两个的时间是不一样的。
生产消费的交易场所就是一个内存,这个内存具体呈现的是:队列、双端队列、环形队列、可能是其他结构用来资源数据或者任务交换的。
3. 循环阻塞队列
在我们之前学习数据结构的时候,我们学习过环形队列,【环形队列复习】。
生产消费模型用上了循环队列之后,就会有一个很大的优势:
- 因为生产者和消费者访问的(假设是数组实现的循环队列)是不同下标位置
- 这二者访问的并非同一块内存空间,所以这就实现了同时访问
- 这样就更加体现了生产消费的
并发
属性
对比与需求:
- 之前学的
queue
是整体被使用的,没法被切割。 - 现在想做一个公共的临界资源,但是这个临界资源可以被划分成不同的区域,要用信号量将这些区域保护起来。
- 所以要写一个基于固定大小的环形队列。
此时就相当于把循环队列这个临界资源分成了一小块一小块,只有满或空的时候,头指针和尾指针才会指向同一块数组空间,其他时间都是不冲突的!
访问同一个位置有可能吗?答:有可能!什么时候会发生呢?
- 两个指针指向同一个位置的时候,只有满或空的时候!(互斥和同步)
- 其他时候,都指向的是两个不同的位置!(并发)
- 让生产者和消费者同时访问数组的不同区域就可以让它俩同时进行生产和消费。
多线程情况下根本就不用考虑队列为满还是为空,因为信号量帮我们考虑。
3.1 POSIX信号量:
在之前的共享内存的学习中,我们简单的提到过信号量 传送门,信号量本质上是一个计数器,是一个描述临界资源数量的计数器。
保证不会在限有资源的情况下让多的线程进入到临界区对临界资源的访问。通过信号量来限制进入临界资源当中的线程的个数。
- P操作:申请资源(原子的)
- V操作:归还资源(原子的)
- 临界资源可以当成整体,可以不可以看做一小部分一小部分呢?
- 可以,是由应用场景决定的。
- 信号量申请成功了,就一定能保证你会拥有一部分临界资源吗?
- 只要信号量申请成功,那么一定会获得指定的资源。
持有0和1的信号量叫做,二元信号量 == 互斥锁
信号量:1
- P —
1->0
— 加锁 - V —
0->1
— 释放锁
小结:
- 如果保证了信号量是多个,那么就可以保证临界资源被划分了不同的区域。
- 所以此时每个线程想进入这个区域,就得先申请信号量,只要申请成功了,这个资源一定给你了。
- 信号量是个计数器,用来衡量临界资源当中,资源数目的,申请信号量的本质叫做预定某种资源。
- 当申请信号量成功的时候,这个信号量对应的资源才可以被唯一的使用。
3.1 - 1 sem_init / sem_destroy
初始化一个未命名的信号量:
销毁信号量:
3.1 - 2 sem_wait
介绍:
sem_wait
是一个信号量操作函数,用于请求和等待信号量的可用性。- 它的作用是尝试获取信号量,如果信号量的值大于0,则将信号量的值减1,并立即返回。
- 如果信号量的值为0,则当前线程会被阻塞,直到有其他线程释放信号量。
这个接口和锁 / 条件变量那里的等待是一样的,可以简单理解为,这个接口就是让信号量减减。
3.1 - 3 sem_post
介绍:
sem_post
是一个信号量操作函数,用于释放或增加信号量的值。- 它的作用是将信号量的值加1,并唤醒可能因为等待信号量而被阻塞的线程。
sem_post
和sem_wait
是一对重要的信号量操作函数,用于实现并发控制和临界区的进入与退出。
通过调用sem_post
来释放信号量,可以让其他线程获取信号量进入临界区,从而实现资源的共享和同步。
3.2 成员变量:
有了上述知识,我们就能可以来着手实现了:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
// 默认容量
const int gCap = 10;
template <class T>
class RingQueue
{
private:
vector<T> ringqueue_; // 环形队列
sem_t roomSem_; // 衡量空间计数器,productor
sem_t dataSem_; // 衡量数据计数器,consumer
uint32_t pIndex_; // 当前生产者写入的位置,如果是多线程,pIndex_也是临界资源
uint32_t cIndex_; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
};
除了两个信号量,生产消费的时候,还需要操生产和消费这两个指针,指向队列正确的位置。
3.3 生产消费:
操作的基本原则:
- 空:消费者不能超过生产者,【生产者先行】:
-
- 消费者前面的数据根本没有,如果超过去读取读到的全都是废弃的数据。
- 满:生产者不能把消费者套一个圈,继续在往后写入,【消费者先行】:
-
- 就会把曾经生产出来的,消费者还没来得及消费的数据就覆盖掉了。
生产者:最关心的是什么资源?
- 空间
N:[N,0] 从N到0的过程
消费者:最关心的是什么资源?
- 数据
N:[0,N] 从0到N的过程
代码演示:
// 生产 -- 先申请信号量
void push(const T &in)
{
// 申请信号量在锁前面的话,如果是多线程,那么多个线程都可以申请到资源
// 然后再去争锁
sem_wait(&roomSem_); // 如果锁加在前面的话,信号量就无法被多次的申请(P操作)
// 在锁这里等时,每个线程都是拿着信号量去等
pthread_mutex_lock(&pmutex_);
ringqueue_[pIndex_] = in; // 生产的过程,有线程安全的问题
pIndex_++; // 写入位置后移
pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征
pthread_mutex_unlock(&pmutex_);
sem_post(&dataSem_); // V操作
}
// 消费
T pop()
{
sem_wait(&dataSem_); // 申请数据资源
pthread_mutex_lock(&cmutex_);
T temp = ringqueue_[cIndex_]; // 消费
cIndex_++;
cIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征
pthread_mutex_unlock(&cmutex_);
sem_post(&roomSem_); // 数据已拿走,空间就露出来了,空间多了一个
return temp;
}
生产者和消费者都为空的时候,一定能保证生产线程先运行,因为一开始消费线程的数据信号量一开始为0,sem_wait(&dataSem_)
函数一开始要阻塞等待。
两个线程各自申请各自所关心的资源,各自释放对方所关心的资源,那么此时这两个就可以互相的互调,协同起来了。
环形队列的使用:(重点)
- 生产者生产时:空间多了一个,申请了一个空间(空间信号量 - 1),数据信号量 + 1。
- 消费者消费时:空间少了一个,释放了一个空间(空间信号量 + 1),数据信号量 - 1。
因为有信号量帮我们做了访问控制,所以我们不需要判断循环队列什么时候为满,什么时候为空:
- 队列为满时,空间信号量为0,生产者无法申请空间。
- 生产者无法生产,会在空间信号量里面等待,不会继续生产,消费者继续消费。
- 队列为空的时候,空间信号量为满,数据信号量为0,没有可以消费的数据。
- 消费者无法消费,会在数据信号量里面等待,不会继续消费,生产者继续生产。
3.4 构造与析构:
RingQueue(int cap = gCap)
: ringqueue_(cap), pIndex_(0), cIndex_(0)
{
// 生产(空间信号量)
sem_init(&roomSem_, 0, ringqueue_.size());
// 消费(数据信号量)
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pmutex_ ,nullptr);
pthread_mutex_init(&cmutex_ ,nullptr);
}
~RingQueue()
{
// 销毁信号量计数器
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
3.5 测试:
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
void *productor(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while (true)
{
int data = rand()%10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
// sleep(1);
}
}
void *consumer(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while (true)
{
int data = rqp->pop();
cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;
sleep(1);
}
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
RingQueue<int> rq;
pthread_t c1,c2,c3, p1,p2,p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
环形队列允许生产和消费同时进入临界区,没问题,只要不同时访问同一个位置就可以,但是如果是多生产多消费,那么就必须维护生产者和生产者之间,消费者和消费者之间的互斥关系。
生产者和生产者之间争一个出来访问环形队列, 消费者和消费者之间争一个出来访问环形队列。
只允许一个线程进入临界资源写入,只允许一个线程从临界资源当中读取。
4. 线程池(懒汉模式)
我们只需要把任务交到这个线程的池子里面,其就能帮我们多线程执行任务,计算出结果。
当任务来时才创建线程,这个成本有点高,如果提前先把各种池化的东西准备好,等任务来的时候,直接把任务指派给某个线程。
无论是进程池还是线程池,本质上都是一种对于执行流的预先分配,当有任务时,直接指定,而不需要创建进程/线程来处理任务。
4.1 单例模式复习:
在我们之前学过的单例模式分为两种,一种是懒汉模式,一种是饿汉模式 [传送门] 。
- 懒汉:刚开始先不创建对象,等第一次使用的时候再去创建。
-
- 缺点:是第一次创建对象需要等待。
-
- 优点:是程序启动快。
- 饿汉:在main函数之前就将对象创建出来。
-
- 缺点:是程序启动会比较慢。
-
- 优点:是启动之后获取对象会比较快。
4.2 成员变量:
用懒汉模式实现一个线程池:
#pragma once
#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Log.hpp"
#include "Lock.hpp"
using namespace std;
int gThreadNum = 5;
template <class T>
class ThreadPool
{
private:
bool isStart_; // 表示是否已经启动
int threadNum_;
queue<T> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
// 改成懒汉模式
static ThreadPool<T> *instance;
const static int a = 100;
};
因为不用关心线程的退出信息,也不需要对线程进行管理,在创建好线程之后,直接detach
分离即可。
static
变量我们需要在类外初始化,模板类型还需要带上template
关键字:
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
4.3 构造和析构:
构造:
private:
ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
// 将拷贝构造和赋值重载删掉
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T>&) = delete;
因为是懒汉模式的单例,提供一个指针作为单例,不对外开放构造函数。
同时,用delete
关键字,禁止拷贝构造和赋值重载。
析构:
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
4.4 两次nullptr判断:
static ThreadPool<T> *getInstance()
{
static Mutex mutex;
if (nullptr == instance) // 仅仅是过滤重复的判断
{
LockGuard lockguard(&mutex); // 进入代码块,加锁。退出代码块,自动解锁。
if (nullptr == instance)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
- 第一个判断是为了保证单例,只要单例对象存在了,就不再创建单例对象了。
- 第二个判断是保证线程安全,可能会出现线程A在创建单例,线程B在申请锁中等待的情况。
- 此时如果不进行第二次nullptr判断,线程B从锁中被唤醒后,又会继续执行,多创建了一个单例对象!
4.5 线程池启动:
处理任务:
static void *threadRoutine(void *args) // args收到了类内指针
{
pthread_detach(pthread_self());
// 此时就拿到了线程池对象指针
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
prctl(PR_SET_NAME, "follower");
while (1)
{
tp->lockQueue();
// 处理任务
while (!tp->haveTask())
{
tp->waitForTask();
}
// 这个任务就被拿到了线程的上下文中
T t = tp->pop();
tp->unlockQueue();
// for debug
int one, two;
char oper;
t.get(&one, &two, &oper);
// 规定,所有的任务都必须有一个run方法
Log() << "新线程完成计算任务: " << one << oper << two << "=" << t.run() << "\n";
}
}
void start()
{
// 作为一个线程池,不能被重复启动
assert(!isStart_);
for (int i = 0; i < threadNum_; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
- 类内成员,成员函数,都有默认参数this,类内要是想把线程搞起来,只能是static。
- static成员函数,无法访问类内成员函数和成员变量,只能通过接口来访问。
4.6 封装加锁/解锁/通知线程等操作:
private:
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
bool haveTask() { return !taskQueue_.empty(); }
void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
4.7 测试:
makefile
中新的用法,可以更加泛型编程:
CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=threadpool
src=ThreadPoolTest.cc
$(bin):$(src)
$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:
rm -f $(bin)
测试代码:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <thread>
// 如何对一个线程进行封装, 线程需要一个回调函数,支持lambda
// class tread{
// };
int main()
{
// 给线程改名字
prctl(PR_SET_NAME, "master");
const string operators = "+/*/%";
// unique_ptr<ThreadPool<Task> > tp(new ThreadPool<Task>());// 懒汉模式之后这个就不能用了
unique_ptr<ThreadPool<Task>> tp(ThreadPool<Task>::getInstance());
tp->start();
srand((unsigned long)time(nullptr));
// 派发任务的线程
while (true)
{
int one = rand() % 50;
int two = rand() % 10;
char oper = operators[rand() % operators.size()];
Log() << "主线程派发计算任务: " << one << oper << two << "=?"
<< "\n";
Task t(one, two, oper);
tp->push(t);
sleep(1);
}
return 0;
}