【Linux】系统编程基于阻塞队列生产者消费者模型(C++)

news2025/1/8 11:48:10

目录

【1】生产消费模型

【1.1】为何要使用生产者消费者模型

【1.2】生产者消费者模型优点

【2】基于阻塞队列的生产消费者模型

【2.1】生产消费模型打印模型

【2.2】生产消费模型计算公式模型

【2.3】生产消费模型计算公式加保存任务模型

【2.3】生产消费模型多生产多消费


【1】生产消费模型

        生产消费模型的321原则(便于记忆)

【解释】

  • 3种关系:生产者和生产者(互斥)、消费者和消费者(互斥)、生产者和消费者(互斥|[保证共享资源的安全性]|同步)。

  • 2种角色:生产者线程、消费者线程。

  • 1种交易场所:一段特定结构的缓冲区。

【1.1】为何要使用生产者消费者模型

        生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

【1.2】生产者消费者模型优点

  • 生产线程和消费线程进行解耦。

  • 支持并发。

  • 提高效率。

  • 支持生产和消费的一段时间的忙闲不均的问题。

【2】基于阻塞队列的生产消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

【2.1】生产消费模型打印模型

【makefile文件】

# 创建关联关系
cc=g++
standard=-std=c++11

# 创建依赖关系
myBlockQueue:BlockQueue.cc 
	$(cc) -o $@ $^ $(standard) -l pthread

# 创建删除关系
.PHONY:clean
clean:
	rm -rf myBlockQueue

【BlockQueue.hpp文件】

#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:
    /* 构造函数 */
    BlockQueue(const int maxCapacity = g_maxCapacity)
        : _maxCapacity(maxCapacity)
    {
        // 初始化锁
        pthread_mutex_init(&_mutex, nullptr);
        // 初始化生产者信号量
        pthread_cond_init(&_pCond, nullptr);
        // 初始化消费者信号量
        pthread_cond_init(&_cCond, nullptr);

    }

    /* 析构函数 */
    ~BlockQueue() 
    {
        // 销毁锁
        pthread_mutex_destroy(&_mutex);
        // 销毁生产者信号量
        pthread_cond_destroy(&_pCond);
        // 销毁消费者信号量
        pthread_cond_destroy(&_cCond);
    }

public:
    /* 新增任务 */
    void PushTask(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否满
        // 细节二:充当条件判断的语法必须是while,不能是if  
        while(IsFull()) 
        {
            // 如果容器满了,生产者就不能继续生产了!
            // 细节一:
            // pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!
            // pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!
            // pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!
            pthread_cond_wait(&_pCond, &_mutex);
        }

        // 程序走到这里,容器一定是没有满的!
        _q.push(in);

        // 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
        pthread_cond_signal(&_cCond);   // 通知消费者已经生产了!
        // 解锁
        pthread_mutex_unlock(&_mutex);

    }

    /* 执行任务 */
    void PopTask(T* out) 
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否空
        // 细节二:与PushTask一致
        while(IsEmpty()) 
        {   
            // 如果容器空了,消费者就不能继续消费了!
            // 细节一:与PushTask一致
            pthread_cond_wait(&_cCond, &_mutex);
        }

        // 程序走到这里,容器一定是没有空的!
        *out = _q.front();
        _q.pop();

        // 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
        pthread_cond_signal(&_pCond);   // 通知生产者已经消费了!
        // 解锁
        pthread_mutex_unlock(&_mutex);
    }

private:
    /* 判断容器满 */
    bool IsFull() 
    {
        return _q.size() == _maxCapacity;
    }

    /* 判断容器空 */
    bool IsEmpty() 
    {
        return _q.empty();
    }

private:
    std::queue<T> _q;       // 存储容器
    int _maxCapacity;       // 标识存储重启最大容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _pCond;  // 生产者条件变量
    pthread_cond_t _cCond;  // 消费者条件变量
};

【BlockQueue.cc文件】

#include "BlockQueue.hpp"

/* 定义生产者线程 */
void *Producer(void *args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    // 生产者进行生产
    while(true) 
    {
        int num = rand() % 10 + 1;
        bq->PushTask(num);
        std:: cout << "生产者在生产:" << num << std::endl;
        sleep(1);
    }
}

/* 定义消费者线程 */
void *Consumer(void *args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    // 消费者进行消费
    while(true) 
    {
        int num = 0;
        bq->PopTask(&num);
        std:: cout << "消费者在消费:" << num << std::endl;
        sleep(2);
    }
}


/* 入口函数 */
int main()
{
    // 随机数种子
    srand((unsigned int)time(nullptr) ^ getpid());
    // 共享资源
    BlockQueue<int> *bqs = new BlockQueue<int>();

    pthread_t tP;
    pthread_t tC;
    pthread_create(&tP, nullptr, Producer, (void *)bqs);
    pthread_create(&tC, nullptr, Consumer, (void *)bqs);

    pthread_join(tP, nullptr);
    pthread_join(tC, nullptr);
    return 0;
}

【2.2】生产消费模型计算公式模型

【makefile文件】

# 创建关联关系
cc=g++
standard=-std=c++11

# 创建依赖关系
myBlockQueue:BlockQueue.cc
	$(cc) -o $@ $^ $(standard) -l pthread

# 创建删除关系
.PHONY:clean
clean:
	rm -rf myBlockQueue

【BlockQueue.hpp文件】

#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:
    /* 构造函数 */
    BlockQueue(const int maxCapacity = g_maxCapacity)
        : _maxCapacity(maxCapacity)
    {
        // 初始化锁
        pthread_mutex_init(&_mutex, nullptr);
        // 初始化生产者信号量
        pthread_cond_init(&_pCond, nullptr);
        // 初始化消费者信号量
        pthread_cond_init(&_cCond, nullptr);

    }

    /* 析构函数 */
    ~BlockQueue() 
    {
        // 销毁锁
        pthread_mutex_destroy(&_mutex);
        // 销毁生产者信号量
        pthread_cond_destroy(&_pCond);
        // 销毁消费者信号量
        pthread_cond_destroy(&_cCond);
    }

public:
    /* 新增任务 */
    void PushTask(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否满
        // 细节二:充当条件判断的语法必须是while,不能是if  
        while(IsFull()) 
        {
            // 如果容器满了,生产者就不能继续生产了!
            // 细节一:
            // pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!
            // pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!
            // pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!
            pthread_cond_wait(&_pCond, &_mutex);
        }

        // 程序走到这里,容器一定是没有满的!
        _q.push(in);

        // 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
        pthread_cond_signal(&_cCond);   // 通知消费者已经生产了!
        // 解锁
        pthread_mutex_unlock(&_mutex);

    }

    /* 执行任务 */
    void PopTask(T* out) 
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否空
        // 细节二:与PushTask一致
        while(IsEmpty()) 
        {   
            // 如果容器空了,消费者就不能继续消费了!
            // 细节一:与PushTask一致
            pthread_cond_wait(&_cCond, &_mutex);
        }

        // 程序走到这里,容器一定是没有空的!
        *out = _q.front();
        _q.pop();

        // 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
        pthread_cond_signal(&_pCond);   // 通知生产者已经消费了!
        // 解锁
        pthread_mutex_unlock(&_mutex);
    }

private:
    /* 判断容器满 */
    bool IsFull() 
    {
        return _q.size() == _maxCapacity;
    }

    /* 判断容器空 */
    bool IsEmpty() 
    {
        return _q.empty();
    }

private:
    std::queue<T> _q;       // 存储容器
    int _maxCapacity;       // 标识存储重启最大容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _pCond;  // 生产者条件变量
    pthread_cond_t _cCond;  // 消费者条件变量
};

【Task.hpp文件】

#pragma once
#include <iostream>
#include <string>
#include <functional>

/* 仿函数类 */
class Task
{
public:
    using func_t = std::function<int(int, int, char)>;

public:
    /* 构造函数 */
    Task() {}

    /* 构造函数 */
    Task(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callBalk(func)
    {
    }

public:
    /* 仿函数 */
    std::string operator()()
    {
        int result = _callBalk(_x, _y, _op);

        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);
        return buffer;
    }

public:
    /* 返回打印公式 */
    std::string ToTaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callBalk;
};

/* 任务执行的种类 */
int MyCalculate(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;

    case '-':
        result = x - y;
        break;

    case '*':
        result = x * y;
        break;

    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
        {
            result = x / y;
        }
        break;
    }

    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
        {
            result = x % y;
        }
        break;
    }

    default:
        break;
    }

    return result;
}

【BlockQueue.cc文件】

#include "BlockQueue.hpp"
#include "Task.hpp"
class Task;
int MyCalculate(int x, int y, char op);


const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{
    BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);
    // 生产者进行生产
    while(true) 
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        int op = rand() % oper.size();

        Task t(x, y, oper[op], MyCalculate);
        calBq->PushTask(t);

        std:: cout << "生产任务:" << t.ToTaskString() << std::endl;
        sleep(2);
    }
}

/* 定义消费者线程 */
void *Consumer(void *args)
{
    BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);
    // 消费者进行消费
    while(true) 
    {
        Task t;
        calBq->PopTask(&t);

        std:: cout << "消费任务:" << t() << std::endl;
        sleep(1);
    }
}


/* 入口函数 */
int main()
{
    // 随机数种子
    srand((unsigned int)time(nullptr) ^ getpid());
    // 共享资源
    BlockQueue<Task> *bqs = new BlockQueue<Task>();
    
    pthread_t tP;
    pthread_t tC;
    pthread_create(&tP, nullptr, Producer, (void *)bqs);
    pthread_create(&tC, nullptr, Consumer, (void *)bqs);

    pthread_join(tP, nullptr);
    pthread_join(tC, nullptr);
    return 0;
}

【2.3】生产消费模型计算公式加保存任务模型

【makefile文件】

# 创建关联关系
cc=g++
standard=-std=c++11

# 创建依赖关系
myBlockQueue:BlockQueue.cc
	$(cc) -o $@ $^ $(standard) -l pthread

# 创建删除关系
.PHONY:clean
clean:
	rm -rf myBlockQueue

【BlockQueue.hpp文件】

#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 500;
template <class T>
class BlockQueue
{
public:
    /* 构造函数 */
    BlockQueue(const int maxCapacity = g_maxCapacity)
        : _maxCapacity(maxCapacity)
    {
        // 初始化锁
        pthread_mutex_init(&_mutex, nullptr);
        // 初始化生产者信号量
        pthread_cond_init(&_pCond, nullptr);
        // 初始化消费者信号量
        pthread_cond_init(&_cCond, nullptr);

    }

    /* 析构函数 */
    ~BlockQueue() 
    {
        // 销毁锁
        pthread_mutex_destroy(&_mutex);
        // 销毁生产者信号量
        pthread_cond_destroy(&_pCond);
        // 销毁消费者信号量
        pthread_cond_destroy(&_cCond);
    }

public:
    /* 新增任务 */
    void PushTask(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否满
        // 细节二:充当条件判断的语法必须是while,不能是if  
        while(IsFull()) 
        {
            // 如果容器满了,生产者就不能继续生产了!
            // 细节一:
            // pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!
            // pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!
            // pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!
            pthread_cond_wait(&_pCond, &_mutex);
        }

        // 程序走到这里,容器一定是没有满的!
        _q.push(in);

        // 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
        pthread_cond_signal(&_cCond);   // 通知消费者已经生产了!
        // 解锁
        pthread_mutex_unlock(&_mutex);

    }

    /* 执行任务 */
    void PopTask(T* out) 
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否空
        // 细节二:与PushTask一致
        while(IsEmpty()) 
        {   
            // 如果容器空了,消费者就不能继续消费了!
            // 细节一:与PushTask一致
            pthread_cond_wait(&_cCond, &_mutex);
        }

        // 程序走到这里,容器一定是没有空的!
        *out = _q.front();
        _q.pop();

        // 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!
        pthread_cond_signal(&_pCond);   // 通知生产者已经消费了!
        // 解锁
        pthread_mutex_unlock(&_mutex);
    }

private:
    /* 判断容器满 */
    bool IsFull() 
    {
        return _q.size() == _maxCapacity;
    }

    /* 判断容器空 */
    bool IsEmpty() 
    {
        return _q.empty();
    }

private:
    std::queue<T> _q;       // 存储容器
    int _maxCapacity;       // 标识存储重启最大容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _pCond;  // 生产者条件变量
    pthread_cond_t _cCond;  // 消费者条件变量
};

【Task.hpp文件】

#pragma once
#include <iostream>
#include <string>
#include <functional>

/* 计算任务 */
class CalTask
{
public:
    using func_t = std::function<int(int, int, char)>;

public:
    /* 构造函数 */
    CalTask() {}

    /* 构造函数 */
    CalTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callBalk(func)
    {
    }

public:
    /* 仿函数 */
    std::string operator()()
    {
        int result = _callBalk(_x, _y, _op);

        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);
        return buffer;
    }

public:
    /* 返回打印公式 */
    std::string ToTaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callBalk;
};

/* 执行计算的方法 */
int MyCalculate(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;

    case '-':
        result = x - y;
        break;

    case '*':
        result = x * y;
        break;

    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
        {
            result = x / y;
        }
        break;
    }

    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
        {
            result = x % y;
        }
        break;
    }

    default:
        break;
    }

    return result;
}

/* 保存任务 */
class SaveTask
{
public:
    using func_t = std::function<void(const std::string &)>;

public:
    /* 构造函数 */
    SaveTask() {}
    /* 构造函数 */
    SaveTask(const std::string &message, func_t func)
        : _message(message), _callBalk(func)
    {
    }

public:
    /* 仿函数 */
    void operator()()
    {
        _callBalk(_message);
    }

private:
    std::string _message;
    func_t _callBalk;
};

/* 保存方法 */
void Save(const std::string& massage){
    std::string target = "./log.txt";
    FILE *fp = fopen(target.c_str(), "a+");
    if(fp == NULL){
        std::cerr << "fopen fail!" << std::endl;
        return;
    }

    fputs(massage.c_str(), fp);
    fputs("\n", fp);
    fclose(fp);
}

【BlockQueue.cc文件】

#include "BlockQueue.hpp"
#include "Task.hpp"


/* 共享资源Queue封装 */
template <class C, class S>
class BlockQueues
{
public:
    BlockQueue<C> *c_bq;
    BlockQueue<S> *s_bq;
};



const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{
    BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;
    // 生产者进行生产
    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        int op = rand() % oper.size();

        CalTask t(x, y, oper[op], MyCalculate);
        calBq->PushTask(t);

        std::cout << "Producer-生产任务:" << t.ToTaskString() << std::endl;
        sleep(2);
    }

    return nullptr;
}

/* 定义消费者线程 */
void *Consumer(void *args)
{
    BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;
    BlockQueue<SaveTask> *serverBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;
    // 消费者进行消费
    while (true)
    {
        CalTask t;
        calBq->PopTask(&t);
        std::string messgae = t();
        std::cout << "Consumer-消费任务:" << messgae << std::endl;

        SaveTask server(messgae, Save);
        serverBq->PushTask(server);
        std::cout << "Consumer-推送保存完成..." << std::endl;
    }   

    return nullptr;
}

/* 定义保存线程 */
void *Saver(void *args)
{
    BlockQueue<SaveTask> *saveQ = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;
    while(true){
        SaveTask t;
        saveQ->PopTask(&t);

        t();
        std::cout << "SAVER-保存任务完成..." << std::endl;
    }
    return nullptr;
}



#define T_PRODUCER 5
#define T_CONSUMER 10
/* 入口函数 */
int main()
{
    // 随机数种子
    srand((unsigned int)time(nullptr) ^ getpid());
    // 共享资源
    BlockQueues<CalTask, SaveTask> bqs;
    bqs.c_bq = new BlockQueue<CalTask>();
    bqs.s_bq = new BlockQueue<SaveTask>();

    // pthread_t tP;
    // pthread_t tC;
    // pthread_t tS;
    // pthread_create(&tP, nullptr, Producer, (void *)&bqs);
    // pthread_create(&tC, nullptr, Consumer, (void *)&bqs);
    // pthread_create(&tS, nullptr, Saver, (void *)&bqs);

    // pthread_join(tP, nullptr);
    // pthread_join(tC, nullptr);
    // pthread_join(tS, nullptr);

    // 创建生产者线程
    pthread_t tP[T_PRODUCER];
    for (int i = 0; i < T_PRODUCER; i++)
    {
        pthread_create(&tP[i], nullptr, Producer, (void *)&bqs);
    }

    // 创建消费者线程
    pthread_t tC[T_CONSUMER];
    for (int i = 0; i < T_CONSUMER; i++)
    {
        pthread_create(&tC[i], nullptr, Consumer, (void *)&bqs);
    }

    // 创建保存线程
    pthread_t tS;
    pthread_create(&tS, nullptr, Saver, (void *)&bqs);

    // 等待生产者线程回收
    for(int i = 0; i < T_PRODUCER; i++)
    {
        pthread_join(*(tP + 1), nullptr);
    }

    // 等待消费者线程回收
    for(int i = 0; i < T_CONSUMER; i++)
    {
        pthread_join(*(tC + 1), nullptr);
    }
    
    // 等待保存线程回收
    pthread_join(tS, nullptr);

    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

【2.3】生产消费模型多生产多消费

【Makefile文件】

# 定义变量与参数字符串进行关联
cc=g++
standard=-std=c++11

# 定义编译关系
myBackQueue: ThreadBackQueue.cc 
	$(cc) -o $@ $^ $(standard) -lpthread

# 定义命令
clean:
	rm -rf myBackQueue

# 配置指令与系统指令分离
.PHONY: clean

【ThreadBackQueue.hpp文件】

#pragma once 
#include <iostream>
#include <queue>
#include <pthread.h>
#include "ThreadMutex.hpp"

const int g_capacityMax = 100;

/* 生产消费者模型封装类 */
template<class T>
class ThreadBackQueue
{
public:
    /* - 构造函数
     */
    ThreadBackQueue(const int& capacity = g_capacityMax)
        : _qCapacity(capacity)
    {
        // 初始化互斥锁
        pthread_mutex_init(&_mutex, nullptr);
        // 初始化生产者条件变量
        pthread_cond_init(&_pCond, nullptr);
        // 初始化消费者条件变量
        pthread_cond_init(&_cCond, nullptr);
    }

    /* - 析构函数
     */
    ~ThreadBackQueue() 
    {
        // 释放互斥锁
        pthread_mutex_destroy(&_mutex);
        // 释放生产者条件变量
        pthread_cond_destroy(&_pCond);
        // 释放消费者条件变量
        pthread_cond_destroy(&_cCond);
    }

public:
    /* - 增加任务
     */
    void Push(const T& in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // LockGuardMutex(&_mutex);
        // 判断是否满
        while(IsFull())
            pthread_cond_wait(&_pCond, &_mutex); // 去等待

        // 一定有空位置
        _q.push(in);

        // 一定有任务
        pthread_cond_signal(&_cCond);

        pthread_mutex_unlock(&_mutex);
    }

    /* - 处理任务
     */
    void Pop(T* out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // LockGuardMutex(&_mutex);
        // 判断是否空
        while(IsEmpty())
            pthread_cond_wait(&_cCond, &_mutex); // 去等待

        // 一定有任务
        *out = _q.front(); 
        _q.pop();

        // 一定有空位置
        if(GetTaskSize() == 1)
            pthread_cond_signal(&_pCond);
        
        pthread_mutex_unlock(&_mutex);
    }

public: 
    /* - 判断队列满
     */
    bool IsFull()
    {
        return _q.size() == _qCapacity;
    }

    /* - 判断队列空
     */
    bool IsEmpty()
    {
        return _q.empty();
    }

    /* - 获取队列中任务个数
     */
    size_t GetTaskSize()
    {
        return _q.size();
    }

private:
    std::queue<T>       _q;             // 消息队列缓冲区
    size_t              _qCapacity;     // 消息队列容量

    pthread_mutex_t     _mutex;         // 互斥锁
    pthread_cond_t      _pCond;         // 生产者条件变量
    pthread_cond_t      _cCond;         // 消费者条件变量
};

【ThreadTask.hpp文件】

#pragma once
#include <cstdio>
#include <iostream>
#include <string>
#include <functional>
#include "ThreadBackQueue.hpp"
class TaskCalculate;
class TaskSave;



template<class C, class S>
class ThreadBackQueues
{
public:
    ThreadBackQueue<C>* _cTask;
    ThreadBackQueue<S>* _sTask;
};



class TaskCalculate
{
private:
    // 定义仿函数
    using func_t = std::function<int(const int, const int, const char)>;

public:
    /* - 无参构造函数 
     */
    TaskCalculate()
    {}

    /* - 带参数的构造函数
     */
    TaskCalculate(func_t func, const int x, const int y, const char op)
        : _func(func)
        , _x(x)
        , _y(y)
        , _op(op)
    {}

public:
    /* - ()运算符重载
     */
    std::string operator()()
    {
        int result = _func(_x, _y, _op);

        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

public:
    std::string TaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int     _x;    // 第一个计算值
    int     _y;    // 第二个计算值
    char    _op;   // 第三个计算值
    func_t  _func; // 仿函数类型
};


int Calculate(const int x, const int y, const char op)
{
    int calRet = 0;

    switch(op)
    {
        case '+':
        {
            calRet = x + y;
            break;
        }
        case '-':
        {
            calRet = x - y;
            break;
        }
        case '*':
        {
            calRet = x * y;
            break;
        }
        case '/':
        {
            if (y == 0)
            {
                std::cerr << "div zero error!" << std::endl;
                calRet = -1;
            }
            else
            {
                calRet = x / y;
            }

            break;
        }
        case '%':
        {
            if (y == 0)
            {
                std::cerr << "mod zero error!" << std::endl;
                calRet = -1;
            }
            else
            {
                calRet = x % y;
            }

            break;            
        }
        default:
        {
             break;
        }
    }

    return calRet;
};



class TaskSave
{
private:
    using func_t = std::function<void(const std::string&)>;

public:
    /* - 无参构造函数 
     */
    TaskSave() 
    {}

    /* - 带参构造函数 
     */
    TaskSave(func_t func, const std::string& msg) 
        : _func(func)
        , _msg(msg)
    {}

public:
    /* - ()运算符重载
     */
    void operator()()
    {
        _func(_msg);
    }

private:
    std::string _msg;   
    func_t      _func;
};

void FileSave(const std::string& msg)
{
    // 创建打开文件目录
    std::string target = "./Log.txt";

    // 打开文件
    FILE* fpath = fopen(target.c_str(), "a+");
    if(fpath == nullptr)
    {
        std::cerr << "fopen fail!" << std::endl;
        return;
    }

    // 写入文件
    fputs(msg.c_str(), fpath);
    fputs("\n", fpath);

    // 关闭文件
    fclose(fpath);
}

【ThreadBase.hpp文件】

#pragma once  
#include <cstdio>
#include <cassert>
#include <iostream>
#include <functional>
#include <string>

#include <pthread.h>
class ThreadBase;

/* 线程上下文数据封装类 */
class ThreadBaseConnectText
{
public:
    ThreadBaseConnectText()
        : _textThis(nullptr)
        , _textArgs(nullptr)
    {}
public: 
    ThreadBase*  _textThis;
    void*        _textArgs;
};


/* 基于原生线程库的线程封装类 */
class ThreadBase
{
private:
	const int ctNum = 64;

public:
    // 定义仿函数
    using func_t = std::function<void*(void*)>;

public:

public:
    /* - 构造函数
     * - func: 线程回调函数
     * - args:线程回调函数参数
     * - num : 编写线程名称设定的编号 
     */
    ThreadBase(func_t func, void* args = nullptr, const int& num = 1)
        : _threadCallBack(func)
        , _threadArgs(args)
    {   
        // 自定义线程名称
        char nameBuffer[ctNum];
        snprintf(nameBuffer, sizeof(nameBuffer), "thread-%d", num);
        _threadName = nameBuffer;

        // 创建线程连接上下文 - 手动释放内存 - 【01】
        ThreadBaseConnectText* connectText = new ThreadBaseConnectText();
        connectText->_textThis = this;
        connectText->_textArgs = _threadArgs;

        int state = pthread_create(&_threadId, nullptr, StartRoutine, (void*)connectText);
        assert(state == 0); (void)state;
    }

    /* - 析构函数
     */
    ~ThreadBase() 
    {}

public:
    /* - 线程等待
     */
    void Join()
    {
        int state = pthread_join(_threadId, nullptr);
        assert(state == 0); (void)state;   
    }

public: 
	/* - 获取线程名称
	*/
	std::string GetThreadName()
	{
		return _threadName;
	}

	/* - 获取线程Id
	*/
	std::string GetThreadId()
	{
		char buffer[ctNum];
		snprintf(buffer, sizeof(buffer), "0x%x", _threadId);
		return buffer;
	}

public:
    /* - 线程函数
     */
    static void* StartRoutine(void* args)
    {
        ThreadBaseConnectText* connectText = static_cast<ThreadBaseConnectText*>(args);
        void* retVal = connectText->_textThis->Run(connectText->_textArgs);

        // 释放内存 - 【01】
        delete connectText;
        // 返回
        return retVal;
    }

private:
	/* - StartRoutine专用函数(因为C/C++混编的原因)
	*/
    void* Run(void* args)
    {
        // 调用回调线程
        return _threadCallBack(args);
    }

private:
    std::string        _threadName;         // 线程名称
    pthread_t          _threadId;           // 线程Id
    func_t             _threadCallBack;     // 线程回调函数
    void*              _threadArgs;         // 线程回调函数参数
};

【ThreadMutex.hpp文件】

#pragma once 
#include <pthread.h>

/* 原生线程锁类封装 */
class Mutex
{
public:
    /* - 构造函数
     */
    Mutex(pthread_mutex_t* mutex)
        : _pMutex(mutex)
    {}

    /* - 析构函数
     */
    ~Mutex() 
    {}

public:
    /* - 加锁函数
     */
    void Lock() { pthread_mutex_lock(_pMutex); }
    /* - 解锁函数
     */
    void UnLock() { pthread_mutex_unlock(_pMutex); }

private:
    pthread_mutex_t*    _pMutex;    // 内部的线程锁
};

class LockGuardMutex
{
public:
    /* - 构造函数
     */
    LockGuardMutex(pthread_mutex_t* mutex)
        : _mutex(mutex)
    {
        _mutex.Lock();
    }
    
    /* - 析构函数
     */
    ~LockGuardMutex()
    {
        _mutex.UnLock();
    }
        
private:
    Mutex   _mutex;
};

【ThreadBackQueue.cc文件】

#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <unistd.h>
#include "ThreadBase.hpp"
#include "ThreadBackQueue.hpp"
#include "ThreadTask.hpp"

std::string g_ops = "+-*/%";

/* - 生产者线程函数
 */
void* ProducerThread(void* args)
{
    ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;
    // 生产
    while(true)
    {

        int x = rand() % 100;
        int y = rand() % 100;
        int o = rand() % g_ops.size();
        TaskCalculate task(Calculate, x, y, g_ops[o]);
        tBQ->Push(task);
        
        std::cout << "生产者在生产任务-> " << "[" << task.TaskString() << "] - - 队列任务个数为:" << tBQ->GetTaskSize()  << std::endl;
        sleep(1);
    }
    return nullptr;
}

/* - 消费者线程函数
 */
void* ConsumeThread(void* args)
{
    ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;
    ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;
    // 消费
    while(true)
    {
        TaskCalculate calculateTask;
        tBQ->Pop(&calculateTask);

        std::string strRet = calculateTask();
        std::cout << "消费者在消费任务-> " << "[" << strRet << "] - - 队列任务个数为:" << tBQ->GetTaskSize() << std::endl;

        TaskSave saveTask(FileSave, strRet);
        sBQ->Push(saveTask);
        std::cout << "消费者在保存任务-> " << "[" << strRet << "] - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;

        sleep(3);
    }
    return nullptr;
}

/* - 保存者线程函数
 */
void* SaveThread(void* args)
{
    ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;
    while(true)
    {
        // 消费保存任务
        TaskSave saveTask;
        sBQ->Pop(&saveTask);

        // 执行保存任务
        saveTask();

        // 保存完成任务
        std::cout << "保存任务完成"<< " - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;
    }
    return nullptr;
}

/* - 程序入口函数
 */
int main()
{
    // 创建随机数种子
    srand((unsigned int)time(nullptr));
    // 创建共享资源
    ThreadBackQueues<TaskCalculate, TaskSave>* pBQ = new ThreadBackQueues<TaskCalculate, TaskSave>();
    pBQ->_cTask = new ThreadBackQueue<TaskCalculate>;
    pBQ->_sTask = new ThreadBackQueue<TaskSave>;

    // 创建生产者线程
    std::unique_ptr<ThreadBase> ptr_pt1(new ThreadBase(ProducerThread, (void*)pBQ, 1));
    std::unique_ptr<ThreadBase> ptr_pt2(new ThreadBase(ProducerThread, (void*)pBQ, 2));
    std::unique_ptr<ThreadBase> ptr_pt3(new ThreadBase(ProducerThread, (void*)pBQ, 3));
    std::unique_ptr<ThreadBase> ptr_pt4(new ThreadBase(ProducerThread, (void*)pBQ, 4));
    std::unique_ptr<ThreadBase> ptr_pt5(new ThreadBase(ProducerThread, (void*)pBQ, 5));
    std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt1->GetThreadName() << " 线程Id:" << ptr_pt1->GetThreadId() << std::endl;
    std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt2->GetThreadName() << " 线程Id:" << ptr_pt2->GetThreadId() << std::endl;
    std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt3->GetThreadName() << " 线程Id:" << ptr_pt3->GetThreadId() << std::endl;
    std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt4->GetThreadName() << " 线程Id:" << ptr_pt4->GetThreadId() << std::endl;
    std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt5->GetThreadName() << " 线程Id:" << ptr_pt5->GetThreadId() << std::endl;
    sleep(10);

    // 创建消费者线程
    std::unique_ptr<ThreadBase> ptr_ct1(new ThreadBase(ConsumeThread, (void*)pBQ, 11));
    std::unique_ptr<ThreadBase> ptr_ct2(new ThreadBase(ConsumeThread, (void*)pBQ, 12));
    std::unique_ptr<ThreadBase> ptr_ct3(new ThreadBase(ConsumeThread, (void*)pBQ, 13));
    std::unique_ptr<ThreadBase> ptr_ct4(new ThreadBase(ConsumeThread, (void*)pBQ, 14));
    std::unique_ptr<ThreadBase> ptr_ct5(new ThreadBase(ConsumeThread, (void*)pBQ, 15));
    std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct1->GetThreadName() << " 线程Id:" << ptr_ct1->GetThreadId() << std::endl;
    std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct2->GetThreadName() << " 线程Id:" << ptr_ct2->GetThreadId() << std::endl;
    std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct3->GetThreadName() << " 线程Id:" << ptr_ct3->GetThreadId() << std::endl;
    std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct4->GetThreadName() << " 线程Id:" << ptr_ct4->GetThreadId() << std::endl;
    std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct5->GetThreadName() << " 线程Id:" << ptr_ct5->GetThreadId() << std::endl;
    sleep(10);
    
    // 创建保存者线程
    std::unique_ptr<ThreadBase> ptr_st1(new ThreadBase(SaveThread, (void*)pBQ, 21));
    std::unique_ptr<ThreadBase> ptr_st2(new ThreadBase(SaveThread, (void*)pBQ, 22));
    std::unique_ptr<ThreadBase> ptr_st3(new ThreadBase(SaveThread, (void*)pBQ, 23));
    std::unique_ptr<ThreadBase> ptr_st4(new ThreadBase(SaveThread, (void*)pBQ, 24));
    std::unique_ptr<ThreadBase> ptr_st5(new ThreadBase(SaveThread, (void*)pBQ, 25));
    std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st1->GetThreadName() << " 线程Id:" << ptr_st1->GetThreadId() << std::endl;
    std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st2->GetThreadName() << " 线程Id:" << ptr_st2->GetThreadId() << std::endl;
    std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st3->GetThreadName() << " 线程Id:" << ptr_st3->GetThreadId() << std::endl;
    std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st4->GetThreadName() << " 线程Id:" << ptr_st4->GetThreadId() << std::endl;
    std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st5->GetThreadName() << " 线程Id:" << ptr_st5->GetThreadId() << std::endl;

    // 等待线程结束
    ptr_pt1->Join();
    ptr_pt2->Join();
    ptr_pt3->Join();
    ptr_pt4->Join();
    ptr_pt5->Join();

    ptr_ct1->Join();
    ptr_ct2->Join();
    ptr_ct3->Join();
    ptr_ct4->Join();
    ptr_ct5->Join();

    ptr_st1->Join();
    ptr_st2->Join();
    ptr_st3->Join();
    ptr_st4->Join();
    ptr_st5->Join();

    delete pBQ->_cTask;
    delete pBQ->_sTask;
    delete pBQ;

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1039846.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

指针笔试题讲解

文章目录 题目答案与解析1、234、5、6、7、8、 题目 int main() {int a[5] { 1, 2, 3, 4, 5 };int *ptr (int *)(&a 1);printf( "%d,%d", *(a 1), *(ptr - 1));return 0; }//由于还没学习结构体&#xff0c;这里告知结构体的大小是20个字节 //由于还没学习结…

解答嵌入式和单片机的关系

嵌入式系统是一种特殊的计算机系统&#xff0c;用于特定任务或功能。而单片机则是嵌入式系统的核心部件之一&#xff0c;是一种在单个芯片上集成了处理器、内存、输入输出接口等功能的微控制器。刚刚好我这里有一套单片机保姆式教学&#xff0c;里面有编程教学、问题讲解、语言…

试图一文彻底讲清 “精准测试”

在软件测试中&#xff0c;我们常常碰到两个基本问题&#xff08;困难&#xff09;&#xff1a; 很难保障无漏测&#xff1a;我们做了大量测试&#xff0c;但不清楚测得怎样&#xff0c;对软件上线后会不会出问题&#xff0c;没有信心&#xff1b; 选择待执行的测试用例&#…

百胜中国,全面进击

“未来三年&#xff0c;每年净增约1800家新店。” 美股研究社关注到&#xff0c;2023年投资者日活动上&#xff0c;百胜中国根据2024至2026年的发展规划&#xff0c;启动了集团RGM2.0战略。 三年时间&#xff0c;门店数要达到20000家&#xff0c;平均每年新增门店约1800家&am…

【【萌新的SOC大学习之hello_world】】

萌新的SOC大学习之hello_world zynq本次hello world 实验需要 PS-PL Configuration 页面能够配置 PS-PL 接口&#xff0c;包括 AXI、HP 和 ACP 总线接口。 Peripheral IO Pins 页面可以为不同的 I/O 外设选择 MIO/EMIO 配置。 MIO Configuration 页面可以为不同的 I/O 外设具…

蓝牙核心规范(V5.4)11.2-LE Audio 笔记之LE Auido架构

专栏汇总网址&#xff1a;蓝牙篇之蓝牙核心规范学习笔记&#xff08;V5.4&#xff09;汇总_蓝牙核心规范中文版_心跳包的博客-CSDN博客 爬虫网站无德&#xff0c;任何非CSDN看到的这篇文章都是盗版网站&#xff0c;你也看不全。认准原始网址。&#xff01;&#xff01;&#x…

event.stopPropagation()

现在有如下 当点击子按钮的时候会触发子事件&#xff0c;同时也会触发父事件&#xff0c; 如何阻止呢 handleDownload(event) { event.stopPropagation(); 。。。。。。。。。。 },

积跬步致千里 || 可视化动图展示

可视化动图展示 目前只能在 jupyter notebook 中测试成功 %matplotlib notebook import numpy as np import matplotlib.pyplot as plt import timen 500 data np.random.normal(0,1,n)fig plt.figure() ax fig.add_subplot(111)fig.show() fig.canvas.draw()for i in ra…

【新版】系统架构设计师 - 案例分析 - 信息安全

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 案例分析 - 信息安全安全架构安全模型分类BLP模型Biba模型Chinese Wall模型 信息安全整体架构设计WPDRRC模型各模型安全防范功能 网络安全体系架构设计开放系统互联安全体系结构安全服务与安全机制…

LRU、LFU 内存淘汰算法的设计与实现

1、背景介绍 LRU、LFU都是内存管理淘汰算法&#xff0c;内存管理是计算机技术中重要的一环&#xff0c;也是多数操作系统中必备的模块。应用场景&#xff1a;假设 给定你一定内存空间&#xff0c;需要你维护一些缓存数据&#xff0c;LRU、LFU就是在内存已经满了的情况下&#…

go语言 rune 类型

ASCII 码只需要 7 bit 就能完整地表示&#xff0c;但只能表示英文字母在内的 128 个字符&#xff0c;为了表示世界上大部分的文字系统&#xff0c;发明了 Unicode &#xff0c;它是 ASCII 的超集&#xff0c;包含世界上书写系统中存在的所有字符&#xff0c;并且为每个代码分配…

排队工会模式:电商营销的新趋势,让你的平台月流水过亿

排队工会模式是一种新型的电商营销模式&#xff0c;它利用产品利润分红的方式来吸引用户购买和推广&#xff0c;从而实现平台的流量和销量的增长。这种模式的核心是建立一个分红池&#xff0c;平台从每个产品的利润中拿出一定比例来充值分红池&#xff0c;然后按照用户的购买顺…

【yolov5】原理

Focus操作 anchors 先验框 其它 Yolov5的模型主要由Backbone、Neck和Head三部分组成。 Backbone&#xff1a;负责提取输入图像的特征。在Yolov5中&#xff0c;常见的Backbone网络包括CSPDarknet53或ResNet。这些网络都是相对轻量级的&#xff0c;能够在保证较高检测精度的同…

前端项目练习(练习-005-webpack-03)

学习前&#xff0c;首先&#xff0c;创建一个web-005项目&#xff0c;内容和web-004一样。&#xff08;注意将package.json中的name改为web-005&#xff09; 前面的代码中&#xff0c;打包工作已经基本完成了&#xff0c;下面开始在本地启动项目。这里需要用到webpack-dev-serv…

如何通过Gunicorn和Niginx部署Django

本文主要介绍如何配置Niginx加载Django的静态资源文件&#xff0c;也就是Static 1、首先需要将Django项目中的Settings.py 文件中的两个参数做以下设置&#xff1a; STATIC_URL /static/ STATIC_ROOT os.path.join(BASE_DIR, static) 然后在宝塔面板中执行python manage.…

Simulink仿真模块 - Digital Clock

Digital Clock:以指定的采样间隔输出仿真时间 在仿真库中的位置为:Simulink / Sources 模型为: 说明 Digital Clock 模块仅以指定的采样间隔输出仿真时间。在其他时间,此模块保留输出的上一个值。要控制此模块的精度,请使用模块对话框中的 Sample time 参数。 当需要离散系…

S09-录入的数据快速分列

选中某一列数据&#xff0c;数据-》分列 确定分隔符

孜然单授权系统V1.0[免费使用]

您还在为授权系统用哪家而发愁&#xff1f;孜然单授权系统为您解决苦恼&#xff0c;本系统永久免费。 是的&#xff0c;还是那个孜然&#xff0c;消失了一年不是跑路了是没有空&#xff0c;但是这些都是无关紧要的&#xff0c;为大家带来的孜然单授权系统至上我最高的诚意&…

论文研究有哪些方法?

在写论文的的时候&#xff0c;选择合适的研究方法至关重要。好的研究方法会增加你论文的可信度和更具有科学性&#xff0c;为你的研究成果增添色彩。下面将介绍几种常用的研究方法&#xff0c;供大家参考学习。 1.文献综述法 多用于理论研究类论文写作。文献综述法是对某一领域…

微信小程序:uniapp解决上传小程序体积过大的问题

概述 在昨天的工作中遇到了一个微信小程序上传代码过大的情况&#xff0c;在这里总结一下具体的解决步骤&#xff0c;首先介绍一下&#xff0c;技术栈是使用uniapp框架HBuilderX的开发环境。 错误提示 真机调试&#xff0c;提示包提交过大&#xff0c;不能正常生成二维码&…