文章目录
- 生产者消费者 定义
- 代码实现 / 思路
- 完整代码
- 执行逻辑 / 思路
- 局部具体分析
- model.cc
- func(消费者线程)
- 执行结果
生产者消费者 定义
生产者消费者模型 是一种常用的 并发编程模型 ,用于解决多线程或多进程环境下的协作问题。该模型包含两类角色:生产者和消费者。
生产者负责生成数据,并将数据存放到共享的缓冲区中。消费者则从缓冲区中获取数据并进行处理。生产者和消费者之间通过共享的缓冲区进行数据交互。
为了确保线程安全,生产者和消费者需要遵循一些规则:
- 如果缓冲区已满,则生产者需要等待直到有空间可用。
- 如果缓冲区为空,则消费者需要等待直到有数据可用。
- 生产者和消费者都不能访问缓冲区的内部结构,只能通过特定的接口进行操作。
代码实现 / 思路
完整代码
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>
// 生产者消费者模型
using namespace std;
#define TNUM 4 // 定义将使用的线程数
typedef void (*func_t)(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);
volatile bool quit = false; // 退出信号,默认为false
// 定义一个具有名称、函数和同步机制(互斥锁和条件变量)的线程数据结构
// 用于传递线程相关的信息和共享资源给不同的线程,实现线程间的通信和同步
class ThreadData
{
public:
ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
: _name(name), _func(func), _pmtx(pmtx), _pcond(pcond) {}
public:
// 成员变量
string _name; // 线程名
func_t _func; // 函数指针
pthread_mutex_t* _pmtx; // 互斥锁指针
pthread_cond_t* _pcond; // 条件变量指针
};
void func1(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
// wait 需要在加锁和解锁之间
pthread_mutex_lock(pmtx); // 加锁
//
pthread_cond_wait(pcond, pmtx); // 默认该线程在执行时,wait 代码被执行,当前线程会被立即阻塞
cout << name << " running <-> 播放" << endl;
pthread_mutex_unlock(pmtx); // 解锁
}
}
void func2(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
// 加锁 等待 解锁
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx);
cout << name << " running <-> 下载" << endl;
pthread_mutex_unlock(pmtx);
}
}
void func3(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
// 加锁 等待 解锁
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx);
cout << name << " running <-> 刷新" << endl;
pthread_mutex_unlock(pmtx);
}
}
void func4(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
// 加锁 等待 解锁
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx);
cout << name << " running <-> 扫码用户信息" << endl;
pthread_mutex_unlock(pmtx);
}
}
// 线程入口函数
void* Entry(void *args)
{
ThreadData* td = (ThreadData*)args; // 获取线程所需的数据
td->_func(td->_name, td->_pmtx, td->_pcond);
delete td;
return nullptr;
}
int main()
{
// 初始化互斥锁mtx 和 条件变量cond
pthread_mutex_t mtx;
pthread_cond_t cond;
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
// 创建 TNUM 个线程,并将每个线程相关的函数和共享的互斥锁、条件变量传递给线程的入口函数 Entry。
// 每个线程都有一个不同的名称和要执行的函数(func)
pthread_t tids[TNUM];
func_t funcs[TNUM] = {func1, func2, func3, func4};
for (int i = 0; i < TNUM; i++)
{
string name = "Thread ";
name += to_string(i+1);
ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);
pthread_create(tids + i, nullptr, Entry, (void*)td); // 创建线程
}
// 调用 pthread_cond_signal 函数向条件变量发送信号,通知等待该条件的线程可以继续运行
int cnt = 20;
while(cnt)
{
cout << "resume thread run code ...." << cnt-- << endl << endl; // 打印输出当前计数器的值,并将计数器减一
pthread_cond_signal(&cond); // 恢复线程
sleep(1);
}
// 代码设置 quit 标志为 true,
// 调用 pthread_cond_broadcast 函数向所有等待该条件的线程广播信号
cout << "ctrl done" << endl;
quit = true;
pthread_cond_broadcast(&cond); // 唤醒所有等待在条件变量 cond 上的线程
// 使用 pthread_join 等待所有线程的完成,然后销毁互斥锁和条件变量
for(int i = 0; i < TNUM; i++)
{
pthread_join(tids[i], nullptr);
cout << "thread: " << tids[i] << "quit" << endl;
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
- 定义了4个线程函数
func1、func2、func3、func4
,分别代表4个线程的执行逻辑。 - 定义了一个
ThreadData
类,用于封装线程相关的信息和共享资源。 - 在主函数中,创建了4个线程,并将每个线程的名称、函数指针、互斥锁和条件变量传递给ThreadData对象,然后通过
pthread_create
函数创建线程。 - 主线程通过循环调用
pthread_cond_signal
函数向条件变量发送信号,唤醒一个等待该条件的线程,然后休眠1秒钟。 - 当计数器cnt减为0时,主线程设置quit标志为true,并通过
pthread_cond_broadcast
函数向所有等待该条件的线程广播信号,通知它们可以退出。 - 使用
pthread_join
函数等待所有线程的完成,然后销毁互斥锁和条件变量。
其中,在整段代码中,func1、func2、func3和func4
函数分别代表消费者,而主函数中通过循环调用pthread_cond_signal
函数唤醒等待条件变量的线程部分代表生产者。
具体来说:
func1
函数代表一个消费者,它的执行逻辑是"播放"。func2
函数代表另一个消费者,它的执行逻辑是"下载"。func3
函数代表第三个消费者,它的执行逻辑是"刷新"。func4
函数代表第四个消费者,它的执行逻辑是"扫描用户信息"。
而在主函数中的循环调用pthread_cond_signal
函数,将信号发送给条件变量cond,可以唤醒等待该条件的线程。这里的循环调用部分代表生产者,通过不断唤醒等待的消费者线程来模拟生产者产生了数据(信号)。
执行逻辑 / 思路
-
首先,主函数开始执行。在主函数中,初始化了互斥锁
mtx
和条件变量cond
。 -
接下来,使用循环创建了4个线程,并将每个线程对应的名称、函数指针、互斥锁和条件变量传递给
ThreadData
对象,然后通过pthread_create
函数创建线程。这样就创建了4个消费者线程。 -
主线程进入一个循环,循环执行20次。在每次循环中,输出当前计数器的值,并将计数器减一。然后通过
pthread_cond_signal
函数向条件变量发送信号,唤醒一个等待该条件的线程。主线程休眠1秒钟,再进行下一次循环。这部分模拟了生产者产生数据的过程。 -
当计数器cnt减为0时,主线程将
quit
标志设置为true
,表示停止生产数据。 -
主线程调用pthread_cond_broadcast函数向所有等待条件变量的线程广播信号,通知它们可以退出。这部分模拟了生产者通知消费者停止消费的过程。
-
最后,主线程通过
pthread_join
函数等待所有线程的完成。每个消费者线程会不断地等在条件变量上,在接收到信号后执行相应的操作,直到收到停止信号。 -
当所有线程完成后,主线程销毁互斥锁和条件变量,程序结束。
总结起来,这段代码的逻辑是创建了4个消费者线程,每个线程都等待条件变量的信号,然后执行相应的操作。主线程作为生产者,通过发送信号唤醒消费者线程来模拟生产数据的过程。最后,当需要停止生产数据时,主线程发送停止信号给消费者线程,消费者线程收到信号后执行完当前操作后退出。整个过程实现了一个简单的生产者消费者模型。
局部具体分析
model.cc
正常编写代码时,为了不污染命名空间,避免命名冲突,一般不会直接进行 using namespcade std;
这里为了方便,直接进行引用。
#define TNUM 4 // 定义将使用的线程数
typedef void (*func_t)(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);
volatile bool quit = false; // 退出信号,默认为false
// 定义一个具有名称、函数和同步机制(互斥锁和条件变量)的线程数据结构
// 用于传递线程相关的信息和共享资源给不同的线程,实现线程间的通信和同步
class ThreadData
{
public:
ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
: _name(name), _func(func), _pmtx(pmtx), _pcond(pcond) {}
public:
// 成员变量
string _name; // 线程名
func_t _func; // 函数指针
pthread_mutex_t* _pmtx; // 互斥锁指针
pthread_cond_t* _pcond; // 条件变量指针
};
解释:
func_t
是一个函数指针类型,可以指向一个接受const string&
类型参数、pthread_mutex_t*
类型参数和pthread_cond_t*
类型参数的函数,返回类型为 void。 用于后续对接线程的功能函数。ThreadData
是 一个具有名称、函数和同步机制(互斥锁和条件变量)的线程数据结构。用于传递线程相关的信息和共享资源给不同的线程,实现线程间的通信和同步
func(消费者线程)
void func1(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
// wait 需要在加锁和解锁之间
pthread_mutex_lock(pmtx); // 加锁
//
pthread_cond_wait(pcond, pmtx); // 默认该线程在执行时,wait 代码被执行,当前线程会被立即阻塞
cout << name << " running <-> 播放" << endl;
pthread_mutex_unlock(pmtx); // 解锁
}
}
- 以
func1
为例:
- 进入一个无限循环,直到全局变量
quit
为true
才退出。 - 在循环内部,首先使用
pthread_mutex_lock
加锁,保证线程独占互斥锁。 - 调用
pthread_cond_wait
等待条件变量,当前线程会被阻塞并释放互斥锁,直到其他线程调用pthread_cond_signal
或pthread_cond_broadcast
来发送信号唤醒该线程。 - 当线程被唤醒后,输出名称和"running <-> 播放"的信息。
- 最后使用
pthread_mutex_unlock
解锁互斥锁。
执行结果
在linux下,可以看出来:
当我们执行程序后,四个线程会不断地执行四种操作,并且在一个线程结束当前任务之前,其他线程会进行等待,最后输出线程退出信息。