目录
1.回忆
Task.hpp
1. #pragma once
2. 头文件和命名空间
3. 类 CallTask
4. 操作符字符串
5. 回调函数 mymath
阻塞队列 BlockQueue 的实现
BlockQueue
生产者和消费者线程
生产者productor
消费者 consumer
主函数 main
代码整体说明
2. 信号量
2.1 回忆:多线程对锁的凌乱竞争✔️
2.2 信号量函数
1. 初始化信号量
2. 销毁信号量
3. 等待信号量(P操作)
4. 发布信号量(V操作)
CP 模型中的锁与信号量使用总结
3.基于环形队列的生产者消费者模型
1.回忆
生产的数据从哪里来?
用户,网络等
生产者生产的数据也是要花时间获取的!
- 获取数据
- 还有 生产数据到队列
同样的,消费者
- 消费数据
- 还有 加工处理数据
真正的高效:非临界区(获取/加工)之间高并发的同时进行
实现派发任务的代码的代码细节讲解
我们先把上一篇文章的代码拷过来解释一下,思路如下
下面这段代码定义了类 CallTask,
并且展示了如何使用这些类来处理计算任务和保存任务。下面会逐行解释代码的各个部分。
Task.hpp
1. #pragma once
#pragma once
- 这是一个预处理指令,用来防止头文件被多次包含,避免重复定义。
2. 头文件和命名空间
#include <iostream>
#include <functional>
#include <string>
using namespace std;
- 包含了标准输入输出库、函数对象库和字符串库。
- 使用
std
命名空间,以便在代码中直接使用标准库中的类型和函数。
3. 类 CallTask
class CallTask
{
typedef function<int(int, int, char)> func_t;
public:
CallTask() {}
CallTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
{
}
string operator()()
{
int result = _callback(_x, _y, _op);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
return buffer;
}//调用()实现对结果的打印
string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback;
};
CallTask
类定义了一个计算任务。func_t
是一个typedef
,定义了一个函数类型,它接受三个参数(两个整数和一个字符)并返回一个整数。CallTask
的构造函数可以初始化两个整数_x
和_y
,一个字符操作符_op
,以及一个回调函数_callback
。operator()
重载了()
操作符,使得对象可以像函数一样被调用。调用时,它会执行_callback
函数(即用户定义的计算逻辑),并返回计算结果的字符串形式。toTaskString()
返回一个描述任务的字符串,但不包含实际计算结果,仅展示操作符和操作数。
4. 操作符字符串
string oper = "+-*/%";
- 定义了一个包含常见数学操作符的字符串。
5. 回调函数 mymath
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
cout << "div zero error" << endl;
result = -1;
}
else
{
result = x / y;
}
}
break;
case '%':
{
if (y == 0)
{
cout << "mod zero error" << endl;
result = -1;
}
else
{
result = x % y;
}
}
break;
default:
break;
}
return result;
}
mymath
函数根据op
操作符执行不同的数学运算。- 如果
op
是/
或%
并且y
为 0,会输出错误信息并返回-1
以表示出错。
阻塞队列 BlockQueue
的实现
进入线程,准备工作做完后,对线程进行判断处理
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
const int maxcapacity = 5;
- 这部分代码是
BlockQueue
类的定义。#pragma once
防止头文件被多次包含。 maxcapacity
定义了队列的最大容量。
BlockQueue
这两部分代码分别展示了一个多线程生产者-消费者模型的实现,其中使用了一个阻塞队列 BlockQueue
来管理生产者和消费者之间的任务流。下面我将详细解释每一部分的代码。
template <class T>
class BlockQueue
{
public:
BlockQueue(const int& capacity = maxcapacity)
: _capacity(capacity)
{
// 构造时初始化
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_pcond, nullptr);
pthread_cond_init(&_ccond, nullptr);
}
void push(const T& in)
{
pthread_mutex_lock(&_mutex);
while (is_full())
{
pthread_cond_wait(&_pcond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
while (is_empty())
{
pthread_cond_wait(&_ccond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
bool is_full()
{
return _q.size() == _capacity;
}
bool is_empty()
{
return _q.empty();
}
private:
queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _pcond;
pthread_cond_t _ccond;
};
BlockQueue
是一个模板类,使用标准库的queue
来存储队列元素。_mutex
是一个互斥锁,用于保护队列的并发访问。_pcond
和_ccond
是条件变量,用于协调生产者和消费者的等待与唤醒。push
方法用于将元素放入队列。如果队列满了,生产者会等待,直到队列有空余位置。pop
方法用于从队列中取出元素。如果队列为空,消费者会等待,直到队列有元素可取。- 析构函数负责销毁互斥锁和条件变量。
生产者和消费者线程
#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
#include "Task.hpp"
- 这部分代码包含了阻塞队列
BlockQueue
和任务类CallTask
的头文件,并引入了ctime
用于时间相关操作和unistd.h
用于线程的休眠。
生产者productor
void* productor(void* args)
{
BlockQueue<Task>* _c_bq = static_cast<BlockQueue<CallTask>*>(args);
while (true)
{
// 生产活动
int x = rand() % 10 + 1;
int y = rand() % 5;
char op = oper[rand() % oper.size()];
Task t(x, y, op, mymath);
_c_bq->push(t);
cout << "productor thread, 生产计算任务: " << t.toTaskString() << endl;
sleep(1); // 生产的慢一些
}
}
- 该函数是生产者线程的执行体。
- 生产者从
args
参数中获得阻塞队列_c_bq
的指针,然后进入一个无限循环。 - 在循环中,生产者随机生成两个整数
x
和y
以及一个操作符op
,创建一个CallTask
对象t
,并将其推入阻塞队列。 - 每生成一个任务后,生产者线程输出一个消息,显示任务的描述,并且线程会休眠1秒,以降低生产速度。
消费者 consumer
void* consumer(void* args)
{
BlockQueue<Task>* _c_bq = static_cast<BlockQueue<CallTask>*>(args);
while (true)
{
// 消费活动
Task t;
_c_bq->pop(&t);
cout << "cal thread, 完成计算任务: " << t() << endl;
}
}
- 该函数是消费者线程的执行体。
- 消费者从
args
参数中获得阻塞队列_c_bq
的指针(是从生产者中传入的),然后进入一个无限循环。 - 在循环中,消费者从阻塞队列中弹出一个
CallTask
对象t
,并执行该任务(通过t()
调用),然后输出计算结果。
主函数 main
队列类当中的函数类 类型
int main()
{
srand((unsigned int)time(nullptr));
BlockQueue<Task>* bq = new BlockQueue<Task>();
pthread_t p[3], c[2];
for (int i = 0; i < 3; ++i)
{
pthread_create(p + i, nullptr, productor, bq);
}
for (int i = 0; i < 2; ++i)
{
pthread_create(c + i, nullptr, consumer, bq);
}
for (int i = 0; i < 3; ++i)
{
pthread_join(p[i], nullptr);
}
for (int i = 0; i < 2; ++i)
{
pthread_join(c[i], nullptr);
}
return 0;
}
埋种子:srand((unsigned int)time(nullptr));
main
函数中首先设置了随机数种子,然后创建一个BlockQueue<CallTask>
对象,用于存放生产者生成的任务。- 创建了3个生产者线程和2个消费者线程,所有线程都使用同一个阻塞队列对象
bq
。 - 主线程等待所有生产者和消费者线程结束,确保所有任务都被正确处理。
代码整体说明
- 生产者-消费者模型:这是一个经典的多线程同步问题,生产者生成任务并放入队列,消费者从队列中取出任务并执行。
- 阻塞队列:通过互斥锁和条件变量,阻塞队列保证了生产者和消费者在多线程环境下的正确协调,防止竞争条件和资源浪费。生产者在队列满时等待,消费者在队列空时等待,从而保证了任务的有序处理。
💡小注意:
- 在 Visual Studio Code (VSCode) 中进行代码的批量替换操作非常简单,可以通过以下步骤完成:按下
Ctrl + H
快捷键 - 如果确认无误,点击“Replace All”按钮来一次性替换所有匹配项。
为什么要先加锁?
因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过再临界资源内部判断的。
为何什么要用 while?解决伪唤醒 ✔️if->while 等待的实现原理
while 一直判断情况,充当休眠等待
- 当前判断生产条件不满足就把自己挂起,但是这有个问题pthread_cond_wait这是一个函数,只要是函数就有调用失败的可能。
- 另外还存在伪唤醒的情况,假设只有一个消费者,十个生产者。只消费了一个但是却唤醒了一批。但是你这里是if判断,都去push肯定是有问题的。
- 因此充当条件判断的语法必须是while,不能用if
单线程模型->多线程模型的思考✔️
多线程对生产和消费队列加了锁的管控,也是只能有一个人的进/出,但是对于非共享区的准备工作,是可以高并发进行的,真正的优势在于:对于非共享的部分可以多线程的提前执行好
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
std::queue<int> buffer;
const int BUFFER_SIZE = 10;
std::mutex mtx;
std::condition_variable cv;
bool done = false; // 生产者是否完成标志
// 生产者
void producer() {
for (int i = 0; i < 20; ++i) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return buffer.size() < BUFFER_SIZE; });
buffer.push(i);
std::cout << "Produced: " << i << std::endl;
lock.unlock();
cv.notify_one();
}
done = true; // 生产者完成
cv.notify_one(); // 通知消费者
}
// 消费者
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !buffer.empty() || done; });
if (done && buffer.empty()) break; // 如果生产者完成且缓冲区为空,则退出
int item = buffer.front();
buffer.pop();
std::cout << "Consumed: " << item << std::endl;
lock.unlock();
cv.notify_one();
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
2. 信号量
例如:电影院买票是一种对座位的预定机制
信号量:原子性进行 P(- -)V(++) 操作的计数器
这把计数器的本质是什么?
计数器用来描述资源数目的,把资源是否就绪放在了临界区之外
申请信号量的时候,其实就已经间接的再做判断了
P()---访问资源-- V()
2.1 回忆:多线程对锁的凌乱竞争✔️
解释
当多个线程试图同时访问一个共享资源(如全局变量)时,通常会使用锁(mutex)来确保一次只有一个线程能够访问该资源。然而,如果没有正确的同步机制,线程可能会以一种不可预测的顺序尝试获取锁,导致以下问题:
- 竞态条件:多个线程几乎同时尝试获取同一把锁,但只有一个线程能够成功获取。其他线程需要等待锁释放才能继续执行,这可能导致资源访问的顺序混乱。
- 饥饿:某些线程可能长时间无法获取锁,因为其他线程总是抢先一步获取锁。
- 死锁:两个或多个线程相互等待对方释放锁,导致所有线程都被阻塞。
- 活锁:线程不断尝试获取锁但始终失败,浪费计算资源。
解决方案
为了防止这种“凌乱竞争”,可以采取以下措施:
- 使用原子操作:确保对共享资源的访问是原子的,即不可分割的。
- 使用互斥锁:确保一次只有一个线程可以访问共享资源。
- 使用条件变量:结合互斥锁使用,允许线程在特定条件下等待,直到满足条件后再继续执行。
- 使用读写锁:允许多个线程同时读取共享资源,但一次只能有一个线程写入。
- 使用信号量:控制多个线程对有限资源的访问。
例如对互斥锁进行回忆:
#include <pthread.h>
#include <stdio.h>
int x = 0;
pthread_mutex_t lock;
void* incrementX(void* arg) {
while (1) {
pthread_mutex_lock(&lock); // 获取锁
x++;
printf("Thread incremented x: %d\n", x);
pthread_mutex_unlock(&lock); // 释放锁
usleep(100000); // 等待一段时间
}
return NULL;
}
int main() {
pthread_t thread1, thread2;
pthread_mutex_init(&lock, NULL);
pthread_create(&thread1, NULL, incrementX, NULL);
pthread_create(&thread2, NULL, incrementX, NULL);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
pthread_mutex_destroy(&lock);
return 0;
}
在这个示例中,每个线程在递增 x
的值之前先获取锁,完成递增后释放锁。这样可以确保任何时候只有一个线程能够修改 x
的值,避免了竞态条件的发生。
总结
- 并发问题:多个线程同时访问同一变量可能导致竞态条件。
- 解决方案:使用互斥锁、原子操作、条件变量等同步机制来保护共享资源。
2.2 信号量函数
信号量是一种用于同步和互斥的机制,在多线程或多进程编程中非常关键。以下是一些常用的信号量操作函数。
1. 初始化信号量
- 原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 参数:
-
sem
:需要初始化的信号量。pshared
:传入0表示线程间共享,传入非0表示进程间共享。value
:信号量的初始值,即计数器的初始值。
- 返回值:成功返回0,失败返回-1。
2. 销毁信号量
- 原型:
int sem_destroy(sem_t *sem);
- 参数:
-
sem
:需要销毁的信号量。
- 返回值:成功返回0,失败返回-1。
3. 等待信号量(P操作)
- 原型:
int sem_wait(sem_t *sem);
- 参数:
-
sem
:需要等待的信号量。
- 返回值:成功返回0,信号量的值减一;失败返回-1,信号量的值保持不变。
4. 发布信号量(V操作)
- 原型:
int sem_post(sem_t *sem);
- 参数:
-
sem
:需要发布的信号量。
- 返回值:成功返回0,信号量的值加一;失败返回-1,信号量的值保持不变。
#include <iostream>
#include <thread>
#include <semaphore.h>
// 定义信号量
sem_t sem;
// 生产者线程函数
void producer() {
for (int i = 0; i < 5; ++i) {
// 发布信号量(V操作)
sem_post(&sem);
std::cout << "Producer incremented semaphore." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
// 消费者线程函数
void consumer() {
for (int i = 0; i < 5; ++i) {
// 等待信号量(P操作)
sem_wait(&sem);
std::cout << "Consumer decremented semaphore." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main() {
// 初始化信号量
if (sem_init(&sem, 0, 0) == -1) {
std::cerr << "Failed to initialize semaphore" << std::endl;
return 1;
}
// 创建生产者和消费者线程
std::thread t1(producer);
std::thread t2(consumer);
// 等待线程结束
t1.join();
t2.join();
// 销毁信号量
sem_destroy(&sem);
return 0;
}
CP 模型中的锁与信号量使用总结
多生产多消费的意义
-
生产与消费的本质:
-
- 生产:将私有的任务转移到公共空间中。
- 消费:从公共空间中取出任务进行处理。
- 意义:多生产多消费模型不仅仅局限于将任务或数据放入交易场所,而是包含了任务生产前和消费后的处理过程,这两个阶段往往是最耗时的。
信号量的本质与意义
- 信号量:本质上是一把计数器。
- 计数器的意义:
-
- 与互斥量相比,信号量可以预设临界资源的情况。
- 在执行 PV(Proberen/Vergrendelen,即测试/锁定)操作过程中,可以在不进入临界区的情况下得知资源情况。
- 这样可以减少临界区内部的判断,提高系统效率。
3.基于环形队列的生产者消费者模型
常见的环形,例如基于%实现
空和满的时候,tail 和 head 指向的是同一个位置,无法判断是空还是满
解决:
- 空一个位置(temp=head+1)来判断 temp 及下一个位置是不是==tail
- 交给信号量来处理
生产和消费没有指向同一个格子,就可以运行下去,遵循着三个原则
例如:我围着圆桌进行放苹果,你跟在我后面拿
- 指向同一个位置的时候,只能一个人访问
- 你不能超过我
- 我不能把你套个圈
正常追逐游戏,必须满足这三个条件
我们两个什么情况才会指向同一个位置?空或满
空:我,生产者执行
满:你,消费者执行
添加信号量
- P 关注什么资源呢?还有多少剩余空间 SpaceSem:N
- C 关注什么资源呢?还有多少剩余数据 DataSem:0
生产者执行时:P(SpaceSem) V(DataSem)
消费者执行时:P(DataSem) V(SpaceSem)
下篇文章将继续讲解~