序言
在上一篇的内容中,我们学习了使用互斥锁来保护共享资源,避免多个线程竞争,造成数据不一致等问题。在这一篇文章中,我们将继续深入,学习多线程同步以及生产消费者模型。
1. 线程同步
1.1 什么是线程同步
线程互斥只是解决了多个线程之间资源竞争的问题,但他不能保证线程之间的 执行顺序以及任务合理分配等问题
。就比如下段代码,具体的逻辑是多个线程进行抢票,记录每一个线程抢的票数(线程执行次数):
// 指定抢票的数量
int Tickets = 10000;
// 全局的锁,保证对共享资源安全的访问
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
......
static void *func_t(void *arg)
{
MyThread *Ptr = static_cast<MyThread *>(arg);
while (true)
{
pthread_mutex_lock(&gmutex);
if (Tickets > 0)
{
Tickets--;
std::cout << Ptr->getName() << " get a tickets there left " << Tickets << std::endl;
pthread_mutex_unlock(&gmutex);
Ptr->CountIncrease();
}
else
{
pthread_mutex_unlock(&gmutex);
break;
}
}
return nullptr;
}
......
在这里只是展示了核心的抢票代码部分,现在我们观察程序的输出结果:
我们一共运行了三次,这三次运行结果充分的向我们展示了什么是累的累死,闲的闲死。因为某些线程抢夺锁的能力会更强一些,所以说执行的次数也就越多,这就会造成 负载均衡不均,资源利用率低等问题
。
为了解决这个问题,我们就需要引进一个新的概念 线程同步:
线程同步是指在多线程环境中,
协调线程之间的执行顺序和数据访问
,以确保它们以一种一致的方式运行。这通常涉及到确保多个线程在执行某些操作时能够按照预期的顺序进行,从而避免出现竞态条件(race condition
)。线程同步的主要目的是为了保证程序的一致性和正确性
。
和线程互斥比起来,线程同步还需要兼顾多个线程之间的执行顺序。
1.2 线程同步 — 条件变量
条件变量是实现线程同步的重要工具之一,特别适用于 当线程需要在某种条件下等待或被唤醒的情况
。条件变量一般与互斥锁配合使用,以保护共享数据和同步线程的执行。
1.1 概念
条件变量允许线程在满足某些条件之前被阻塞(等待),并在条件满足时被唤醒。它主要用于以下场景:
- 生产者-消费者问题:生产者线程在数据缓冲区满时等待,消费者线程在数据缓冲区空时等待。
- 线程池:工作线程在任务队列为空时等待,新任务到来时唤醒线程。
1.2 条件变量的初始化
在使用条件变量之前,必须初始化互斥锁和条件变量(可以使用全局也可使用局部的):
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
// 或者使用 pthread_mutex_init 和 pthread_cond_init 函数进行初始化。
1.3 锁定互斥锁
在使用条件变量之前,线程必须先锁定互斥锁
:
pthread_mutex_lock(&mtx);
1.4 等待条件变量
当线程需要等待某个条件时,它 释放互斥锁并挂起自己(释放锁,避免发生死锁问题) ,等待条件变量的信号
:
// 进程会阻塞在此处,等待信号
pthread_cond_wait(&cv, &mtx);
1.5 发出信号
当线程修改了共享资源,使得等待的线程可以继续执行时,发出信号通知一个或所有等待的线程:
pthread_cond_signal(&cv); // 唤醒一个等待的线程
// 或
pthread_cond_broadcast(&cv); // 唤醒所有等待的线程
1.6 解锁互斥锁
在修改共享资源后,线程必须释放互斥锁,以便其他线程可以访问:
pthread_mutex_unlock(&mtx);
1.7 销毁
在程序结束时,需要销毁条件变量和互斥锁:
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cv);
1.3 实现线程同步
在了解了体了条件变量的基本使用后,我们对我们上述的代码进行改进,保证每个线程之间顺序执行:
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
void *Print(void *arg)
{
std::string name = static_cast<const char *>(arg);
while (true)
{
pthread_mutex_lock(&gmutex);
pthread_cond_wait(&gcond, &gmutex);
std::cout << "I am " << name << std::endl;
pthread_mutex_unlock(&gmutex);
}
}
const int num = 5;
int main()
{
pthread_t threads[num];
for (int i = 0; i < num; i++)
{
char *name = new char[64];
snprintf(name, 64, "thread_%d", i + 1);
pthread_create(threads + i, nullptr, Print, (void*)name);
}
while (true)
{
pthread_cond_signal(&gcond);
sleep(1);
}
for (int i = 0; i < num; i++)
{
pthread_join(threads[i], nullptr);
}
return 0;
}
在这段程序中就能有序的输出每个线程的结果,避免了一家独大的情况:
2. 生产消费者模型
生产消费者模型(Producer-Consumer Model
)是一种广泛应用于多线程编程中的设计模式,旨在通过引入一个缓冲区(或称为队列)来 解耦生产数据和消费数据的过程
,实现生产者和消费者之间的 平衡和协作
。
2.1 概念
该模型由三个对象所构成,分别是 生产者,缓冲区,消费者
:
生产者(Producer)
:负责生成数据并将其放入缓冲区。生产者可以是一个或多个线程/进程,它们不断地生成数据以供消费者处理。消费者(Consumer)
:负责从缓冲区中取出数据进行处理。消费者同样可以是一个或多个线程/进程,它们不断从缓冲区中获取数据并执行相应的处理操作。缓冲区(Buffer/Queue)
:作为生产者和消费者之间的中介,用于存储生产者生成的数据,供消费者取用。缓冲区的大小通常是有限的,以避免无限制地存储数据导致内存溢出。
在这里我们引入实际生活中的例子来帮助大家理解:
我们在生活中经常前往超市购物吧,此时工厂就充当生产者,超市就充当缓冲区,我们就充当消费者。当我们需要相应的商品时,会直接前往商店购物而不是工厂,商场会根据当前库存的量告诉工厂,最近消费多我需要多进货或者是最近消费低我进货速度稍微慢一点,这样就很好的调节了生产与消费相适应的速度。
2.2 特点
我们依旧使用 工厂 - 超市 - 消费者 来阐述该系统的特点:
并发性
:生产者和消费者可以并发执行,提高了程序的效率和响应速度。负载均衡
:货物摆放策略:超市可能会优化货架的摆放策略,以平衡顾客的购物需求和供应商的补货速度。解耦合
:货物补充和购物:顾客可以随时购物,而供应商则会在不同的时间点进行补货,顾客购物的过程不会阻碍供应商的补货工作。
2.3 工作原理
生产者流程
- 生产者生成数据。
- 检查缓冲区是否已满。
- 如果缓冲区未满,将数据放入缓冲区;如果缓冲区已满,则生产者可能需要等待。
- 通知等待的消费者缓冲区中有新数据可取。
消费者流程
- 消费者检查缓冲区是否为空。
- 如果缓冲区不为空,从缓冲区中取出数据进行处理;如果缓冲区为空,则消费者可能需要等待。
- 处理完数据后,根据需要通知生产者或其他消费者。
2.4 代码实现
在这里我们使用队列作为我们的缓冲区,在开始之前,现提出几个问题:
- 若缓冲区已满,则生产者不再生产,需要阻塞等待,那么谁唤醒他呢?
- 若缓冲区为空,则消费者不能获取,需要阻塞等待,这又谁唤醒他呢?
又从我们的商店视角来看,当货架满了,肯定就不能进货了,工厂停止生产,那什么时候告诉工厂可以生产呢?消费者获取数据时!消费者获取数据了则货架肯定不是满的,商店就可以告诉工厂你们可以生产啦!反之一样的,当货架为空,什么时候可以获取数据呢?工厂生产时!工厂生产了,则肯定上货架了,消费者就可以获取数据了。
费者和生产者通常 通过互相唤醒来协调操作
:
生产者
:在生产数据后可能会通知或唤醒消费者,特别是当缓冲区有新的数据时。消费者
:在处理完数据后,可能会通知或唤醒生产者,特别是当缓冲区有足够空间时。
这种互相唤醒机制 有助于保持系统的平衡,避免阻塞和空闲状态,从而提高整体效率。
构造函数和析构函数
template <class T>
class BlockQueue
{
BlockQueue(int Cap = DefalutCapacity)
: _Cap(Cap);
{
// 初始化锁
pthread_mutext_init(&_mutex, nullptr);
// 初始化条件变量
pthread_cond_init(&_p_cond, &_mutex);
pthread_cond_init(&_c_cond, &_mutex);
}
~BlockQueue()
{
// 删除锁
pthread_mutex_destroy(&_mutex);
// 删除条件变量
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _BlockQueue; // 缓冲区存储临界资源
int _Cap; // 缓冲区容量
pthread_mutex_t _Mutext; // 锁
pthread_cond_t _p_cond; // 生产者的条件变量
pthread_cond_t _c_cond; // 消费者的条件变量
};
- 在这里使用互斥锁
保证在多个线程同时访问时,队列的状态(如队列的大小、队列中的元素等)保持一致性。
- 准备两个条件变量,分别在特殊情况时让消费者和生产者阻塞等待
生产接口
所谓生产接口就是往队列中写入数据:
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
if(IsFull())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_BlockQueue.push(in);
pthread_mutex_unlock(&_mutex);
// 通知消费者可以消费了
pthread_cond_signal(&_c_cond);
}
- 对临界区进行访问,首先需要上锁
- 判断缓冲区(队列)是否已满
- 若已满则阻塞等待,反之写入数据
- 解锁,并且通知消费者消费
BUG警告:在这里乍一看是没什么问题的,但是我们需要思考特殊情况,加入现在有两个生产者 A,B 被阻塞等待,当一个消费者取出数据后,向生产者发出生产信号,其中 A 接受信号并且抢夺了锁往后执行,现在 A 写入数据,队列再次为满。A 执行完成后,释放了锁并且通知消费者消费,但是请注意
B 在之前已经被被释放了,所以也会参与抢锁的过程
,如果锁被 B 抢到了,那是不是就会有问题了?明明队列为满,生产者还是往里写数据,这是不是就会出错了 ! 这就是虚假唤醒
,线程在没有满足预期条件的情况下被唤醒,被唤醒的线程可能继续执行后续操作,而这些操作可能基于一个已经不再满足的条件。
解决方案也非常简单,因为 if
只能判断一次,所以我们换为 while
轮询的检查是否满足生产的条件:
while(IsFull())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
消费接口
消费接口就是消费者读取数据的接口:
// 消费接口
void pop(T &out)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
out = _BlockQueue.front();
_BlockQueue.pop();
pthread_mutex_unlock(&_mutex);
// 通知生产者可以生产了
pthread_cond_signal(&_p_cond);
}
逻辑和生产接口是相差不多的,在这里就不过多解释了哦。
3. 总结
在本文中我们介绍了线程同步的概念,以及基于线程同步的生产消费者模型,还是尝试实现了该模型的核心代码,希望大家有所收获!