目录
0.上篇
1. 线程同步概念
2.认识条件变量
2.1条件变量的概念
2.2认识接口
2.3写一个测试代码
3.生产者消费者模型
3.1概念部分
1.基本概念
2.主要问题
3.优点
4.思考切入点(321原则)
3.2编写基于BlockingQueue的生产者消费者模型(代码)
3.2.1 概念准备
3.2.2 编写代码(C++ queue模拟阻塞队列的生产消费模型))
0.上篇
Linux--线程互斥(加锁)-CSDN博客
1. 线程同步概念
线程同步同样是需要互斥锁来实现的,但不同的是:线程同步,程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息或状态。当某个线程需要等待另一个线程完成某项任务后才能继续执行时,就需要进行线程同步。同步的目的是确保线程之间按照一定的顺序或规则来访问共享资源或执行特定操作。
线程同步是要强调顺序性的,而非单一的竞争,这就为访问临界资源提供了一定的合理性,不会让同一个线程连续多次的访问临界资源。这就与线程互斥有区别了,线程互斥只完成了排他性,但同步在排他的基础上,完成了线程访问资源的顺序性。同步的概念更为广泛,它不仅包括互斥,还包括其他形式的线程间协作和顺序控制。可以说互斥是同步的一种特殊形式。
2.认识条件变量
2.1条件变量的概念
条件变量是一种同步原语,它提供了一种线程间通信的方式。当线程需要等待某个条件成立时,它可以使用条件变量将自己挂起并进入等待状态。一旦条件成立,另一个线程会通知条件变量,从而唤醒等待的线程。
条件变量必须与互斥锁结合使用,以确保线程在检查条件和等待条件变量时的原子性。这意味着,在调用条件变量的等待函数之前,线程必须已经持有与条件变量关联的互斥锁。当线程被条件变量唤醒后,它会重新获取互斥锁,并再次检查条件是否真正满足。
条件变量提供了两种基本操作:
- 等待(wait):线程调用条件变量的等待函数时,会释放已持有的互斥锁并进入等待状态。此时,线程不再消耗CPU资源,直到被其他线程唤醒。唤醒后,线程会重新获取互斥锁,并继续执行后续操作。
- 通知(notify/notifyAll):当条件满足时,另一个线程会调用条件变量的通知函数来唤醒一个或所有等待的线程。通知操作必须在持有互斥锁的情况下进行,以确保线程同步的正确性。
条件变量通常是与线程队列相关联的,因为可能有多个线程等待同一个条件,条件满足时,条件变量会从队列中唤醒一个或多个线程,使它们能够继续执行。
2.2认识接口
接口:pthread_cond_init函数是用于初始化一个条件变量
cond
是指向条件变量对象的指针,该对象将被初始化。attr
是指向条件变量属性的指针,用于指定条件变量的属性。如果此参数为NULL
,则使用默认属性。在大多数应用中,通常传递NULL
。函数成功时返回
0
;出错时返回错误码。在实际使用中,静态初始化的方式(如示例中
PTHREAD_COND_INITIALIZER
和PTHREAD_MUTEX_INITIALIZER
)通常用于全局或静态的条件变量和互斥锁。对于局部变量或需要动态配置属性的情况,应使用pthread_cond_init
和pthread_mutex_init
函数进行动态初始化。pthread_cond_t,该类型用于表示条件变量。
pthread_cond_t
的使用中,通常需要与互斥锁(pthread_mutex_t
)一起工作,以确保对共享数据的访问是同步的。条件变量本身不直接管理或保护任何数据;它们必须与互斥锁结合使用,以确保在检查条件(即“等待”条件)和修改条件(即“通知”或“广播”条件)时,数据的完整性得到保护。
接口:pthread_cond_wait,用于使线程在条件变量上等待,直到该条件变量被另一个线程的信号(
pthread_cond_signal
)或广播(pthread_cond_broadcast
)唤醒。
cond
是指向条件变量对象的指针。mutex
是指向互斥锁对象的指针,该互斥锁必须在调用pthread_cond_wait
之前被当前线程持有(即锁定状态)。
接口:pthread_cond_signal(唤醒一个线程)和pthread_cond_broadcast(唤醒所有线程),
是用于唤醒等待条件变量的线程的两个函数。这两个函数通常与pthread_cond_wait
或pthread_cond_timedwait
一起使用,以实现线程间的同步。
cond
是指向要唤醒线程的条件变量的指针。成功时返回
0
;失败时返回错误码。
2.3写一个测试代码
eg:让主线程定期的唤醒新线程
我们对临界资源进行加锁,并给了条件变量,有主线程一一唤醒。
pthread_cond_signal一个一个的进行唤醒
#include <iostream> #include <string> #include <unistd.h> #include <pthread.h> const int num = 5; pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; void *Wait(void *args) { std::string name = static_cast<const char *>(args); while (true) { pthread_mutex_lock(&gmutex); pthread_cond_wait(&gcond, &gmutex /*?*/); // 这里就是线程等待的位置 usleep(10000); std::cout << "I am : " << name << std::endl; pthread_mutex_unlock(&gmutex); // usleep(100000); } } int main() { pthread_t threads[num]; for (int i = 0; i < num; i++) { char *name = new char[1024]; snprintf(name, 1024, "thread-%d", i + 1); pthread_create(threads + i, nullptr, Wait, (void *)name); usleep(10000); } sleep(1); // 唤醒其他线程 while (true) { pthread_cond_signal(&gcond); //pthread_cond_broadcast(&gcond); //std::cout << "唤醒所有线程...." << std::endl; std::cout << "唤醒一个线程...." << std::endl; sleep(2); } for (int i = 0; i < num; i++) { pthread_join(threads[i], nullptr); } return 0; }
我们发现线程唤醒后执行,并且是按顺序唤醒,按顺序执行。
下面是pthread_cond_broadcast唤醒一批线程的效果:没顺序,但每批线程的每个线程都是执行一次。
3.生产者消费者模型
3.1概念部分
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
1.基本概念
- 生产者:负责生成数据或任务的实体。它执行某些操作,如读取文件、计算数据或接收输入,然后将生成的数据放入某个中间存储位置(如队列、缓冲区等)。
- 消费者:负责处理数据的实体。它从中间存储位置取出数据,进行进一步处理,如显示、存储到数据库或发送给另一个系统。
- 缓冲区(阻塞队列):用于存储生产者生成而消费者尚未处理的数据的临时存储区。它解决了生产者和消费者之间速度不匹配的问题。
2.主要问题
- 同步:确保生产者和消费者不会同时访问缓冲区,以避免数据竞争和不一致。
- 互斥:确保在任一时刻只有一个生产者或消费者能够访问缓冲区。
- 死锁:防止两个或多个线程永久阻塞,每个线程都在等待其他线程释放资源。
- 饥饿:确保每个消费者都有机会从缓冲区中获取数据,防止某些消费者因为某些原因(如优先级、调度策略等)而得不到服务。
3.优点
解耦:
生产者和消费者之间的解耦是模型的核心优势之一。生产者不需要知道消费者的具体实现细节,同样,消费者也不需要知道生产者的具体实现。它们之间通过共享的缓冲区进行通信,这种解耦使得系统的各个部分可以独立地发展和优化。提高效率和吞吐量:
生产者可以在没有消费者立即处理数据的情况下继续生产,而消费者也可以在生产者没有新数据时继续处理已有数据。这种并行处理能力可以显著提高系统的整体效率和吞吐量。平衡负载:
在生产者-消费者模型中,可以通过调整生产者和消费者的数量来平衡系统的负载。如果生产者生成数据的速度超过了消费者的处理能力,可以添加更多的消费者来分担负载。反之,如果消费者的处理能力超过生产者,则可以减少消费者的数量以节省资源。灵活性:
模型具有很高的灵活性,可以根据需要轻松地添加或删除生产者和消费者。此外,缓冲区的大小也可以根据需要进行调整,以适应不同的工作负载和数据流量。简化并发控制:
通过使用锁、条件变量或其他同步机制,生产者消费者模型可以简化并发控制。这些机制确保了生产者和消费者之间的正确同步,并防止了数据竞争和条件竞争等并发问题。增强系统的可扩展性:
由于生产者和消费者之间的解耦和并行处理能力,生产者消费者模型可以很容易地扩展到处理更多的数据或更复杂的任务。通过增加生产者和消费者的数量,系统可以处理更高的负载并保持高效的性能。4.思考切入点(321原则)
1.一个仓库(一段内存空间,如队列)
2.两种角色(生产线程,消费线程)
3.三种关系(生产和生产(互斥关系),消费和消费(互斥关系),生产和消费(互斥关系&&同步关系))
3.2编写基于BlockingQueue的生产者消费者模型(代码)
3.2.1 概念准备
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
3.2.2 编写代码(C++ queue模拟阻塞队列的生产消费模型))
代码:
BlockingQueue.hpp
#pragma once #include <iostream> #include <string> #include <queue> #include <pthread.h> template<typename T> class BlockQueue { private: bool IsFull() { return _block_queue.size() == _max_cap; } bool IsEmpty() { return _block_queue.empty(); } public: BlockQueue(int cap = defaultcap) : _max_cap(cap) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_p_cond, nullptr); pthread_cond_init(&_c_cond, nullptr); } // 假设:2个消费者 void Pop(T *out) { pthread_mutex_lock(&_mutex); while (IsEmpty()) { // 添加尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒! pthread_cond_wait(&_c_cond, &_mutex); // 两个消费者都在这里等待了 } // 1. 没有空 || 2. 被唤醒了 *out = _block_queue.front(); _block_queue.pop(); pthread_mutex_unlock(&_mutex); //由生产者唤醒 pthread_cond_signal(&_p_cond); } // 一个生产者 void Equeue(const T &in) { pthread_mutex_lock(&_mutex); while (IsFull()) // if ? { // 满了,生产者不能生产,必须等待 // 可是在临界区里面啊!!!pthread_cond_wait // 被调用的时候:除了让自己继续排队等待,还会自己释放传入的锁 // 函数返回的时候,不就还在临界区了! // 返回时:必须先参与锁的竞争,重新加上锁,该函数才会返回! pthread_cond_wait(&_p_cond, &_mutex); } // 1. 没有满 || 2. 被唤醒了 _block_queue.push(in); // 生产到阻塞队列 pthread_mutex_unlock(&_mutex); // 让消费者消费,由消费者唤醒 pthread_cond_signal(&_c_cond); // pthread_cond_broadcast : 一种场景 } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_p_cond); pthread_cond_destroy(&_c_cond); } private: std::queue<T> _block_queue;//临界资源 int max_cap;//最大容量 pthread_mutex_t _mutex;//锁 pthread_cond_t _p_cond;//生产者的条件变量 pthread_cond_t _c_cond; // 消费者条件变量 };
构造函数 (
BlockQueue
): 初始化队列的最大容量、互斥锁和两个条件变量(一个用于生产者,一个用于消费者)。
Pop
方法: 供消费者线程调用,从队列中移除并返回队列前端的元素。如果队列为空,则消费者线程将阻塞在pthread_cond_wait
调用上,直到生产者向队列中添加了元素并通知了消费者。
Equeue
方法: 供生产者线程调用,向队列中添加一个新元素。如果队列已满,则生产者线程将阻塞在pthread_cond_wait
调用上,直到消费者从队列中移除了元素并通知了生产者。析构函数 (
~BlockQueue
): 清理资源,销毁互斥锁和条件变量注意:
1.这两个的顺序是没有关系,因为无论是消费过程,还是生产过程,都是要持有锁的,唤醒后是要重新申请锁的。
pthread_mutex_unlock(&_mutex); //由生产者唤醒 pthread_cond_signal(&_p_cond);
2.在判断的时候不用1f而用while,如果有两个消费者A和B,A竞争锁成功了,他就会执行临界区代码,先判断商品是否为空,如果空了就要wait;此时B可以持有锁,B也可以执行临界区代码,因为商品空了,所有B也要等待。
此时生产者生产了一个商品,使用 pthread_cond_broadcast把A和B都唤醒,那么如果A被唤醒拿走了一个商品,B也被唤醒,B现在锁处等待A释放锁,B拿到锁后从wait处返回,继续执行下面代码,去拿商品的时候却没有商品:添加尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒!为了避免这种情况的发生,使用while判断商品是否为空,如果为空那么重新等待,对于生产者同样如此!
while (IsEmpty()) while (IsFull())
要执行的任务:task.hpp
生产者提供算术题,消费者完成算术题
#pragma once #include<iostream> // 要做加法 class task { public: task() { } task(int x, int y) : _x(x), _y(y) { } void Excute() { _result = _x + _y; } void operator ()() { Excute(); } std::string debug() { std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?"; return msg; } std::string result() { std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result); return msg; } private: int _x; int _y; int _result; };
main.cc:
这里创建了多生产者,多消费者,生产者提供10以内的加法题, 消费者负责运算。
#include "BlockingQueue.hpp" #include "task.hpp" #include <pthread.h> #include <ctime> #include <unistd.h> void *Consumer(void *args) { BlockQueue<task> *bq = static_cast<BlockQueue<task> *>(args); while(true) { // 1. 获取数据 task t; bq->Pop(&t); // 2. 处理数据 t.Excute(); std::cout << "Consumer -> " << t.result() << std::endl; } } void *Productor(void *args) { srand(time(nullptr) ^ getpid()); BlockQueue<task> *bq = static_cast<BlockQueue<task> *>(args); while(true) { // 1. 构建数据/任务 int x = rand() % 10 + 1; // [1, 10] usleep(x * 1000); int y = rand() % 10 + 1; // [1, 10] task t(x, y); // 2. 生产数据 bq->Equeue(t); std::cout << "Productor ->" <<t.debug()<< std::endl; sleep(1); } } int main() { BlockQueue<task> *bq = new BlockQueue<task>(); pthread_t c1,c2, p1,p2,p3; pthread_create(&c1, nullptr, Consumer, bq); pthread_create(&c2, nullptr, Consumer, bq); pthread_create(&p1, nullptr, Productor, bq); pthread_create(&p2, nullptr, Productor, bq); pthread_create(&p3, nullptr, Productor, bq); pthread_join(c1, nullptr); pthread_join(c2, nullptr); pthread_join(p1, nullptr); pthread_join(p2, nullptr); pthread_join(p3, nullptr); return 0; }
生产者生产到仓库,消费者查看到仓库不为空则进行消费,生产者生产的过程是并发的,消费者消费的过程也是并发的,这就提高了解决任务的效率。