生产者消费者模型 C++ 版

news2025/1/15 9:12:22

基础生产者消费者模型

网上一般教程是使用std::queue,定义消费者 Consumer ,定义Producter类,在main函数里面加锁保证线程安全。
本片文章,实现一个线程安全的队列 threadsafe_queue,只在队列内部加锁。如此可适配,多生产者多消费者的场景

线程安全的队列 threadsafe_queue

#pragma once
#include<mutex>
#include <condition_variable>
#include<queue>
//最大产品数量
#define MAX_SIZE 20
#include <iostream>
template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}
    
    void wait_and_pop(T& value)  // 2
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
        std::cout << "Consumer pop : " << value << std::endl;
        data_cond.notify_all();
    }
    std::shared_ptr<T> wait_and_pop()  // 3
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });  // 4
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        data_cond.notify_all();
        return res;
    }
    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        data_cond.notify_all();
        return true;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();  // 5
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        data_cond.notify_all();
        return res;
    }
    void push(T new_value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] {return data_queue.size() < MAX_SIZE; });
        std::cout << "Producter push : " << new_value << std::endl;
        data_queue.push(std::move(new_value));

        data_cond.notify_all();  // 1
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};


消费者

Consumer 头文件

#pragma once
#include"threadsafe_queue.h"
/*
基础版 消费者生产者模型
此文件为 消费者
*/

class Consumer
{
public:
	Consumer(threadsafe_queue<int>& queue);
	~Consumer();
	void start();

private:
	threadsafe_queue<int>& m_pQueue;
};

cpp文件

#include "Consumer.h"
#include <iostream>
#include<windows.h>
Consumer::Consumer(threadsafe_queue<int> &queue)
	: m_pQueue(queue)
{

}

Consumer::~Consumer()
{
}


void Consumer::start()
{
	while (true)
	{
		int value;
		m_pQueue.wait_and_pop(value);
		Sleep(10);
	}
}
 

生产者

#pragma once
#include"threadsafe_queue.h"
class Producter
{
public:
	Producter(threadsafe_queue<int>& queue,int i);
	~Producter();
	void start();

private:
	threadsafe_queue<int>& m_pQueue;
	int index;
};
#include "Producter.h"
#include<stdlib.h>
#include <iostream>
#include<windows.h>
Producter::Producter(threadsafe_queue<int>& queue, int i)
	: m_pQueue(queue),index(i)
{

}

Producter::~Producter()
{
}


void Producter::start()
{
	int i = 0;//为了测试是哪个线程,加入了哪些数据用的
	while (true)
	{
		//int data = rand();
		m_pQueue.push(i*2+ index);// index =0 为偶数生产者,只生产偶数,index =1则是生产奇数
		Sleep(100);
		i++;
	}
}

运行结果如下:
在这里插入图片描述

控制消费者有序消费

优先队列做缓存
头文件

#pragma once
#include<mutex>
#include <condition_variable>
#include <iostream>
#include <algorithm>
#include <vector>
#include <queue>

//最大产品数量
#define MAX_SIZE 20
/*


*/
template<typename T, class _Container = std::vector<T>, class _Pr = std::less<typename _Container::value_type>>
class threadsafe_priority_queue
{
public:
    threadsafe_priority_queue();
    ~threadsafe_priority_queue();

    void push(T new_value);
    bool empty() const;

    void wait_and_pop(T& value);

    std::shared_ptr<T> wait_and_pop();

    bool try_pop(T& value);

    std::shared_ptr<T> try_pop();


    void wait_and_pop(T& value, bool (*func)(const T&, int), int curIndex);

    std::shared_ptr<T> wait_and_pop(bool (*func)(const T&, int), int curIndex);

    bool try_pop(T& value, bool (*func)(const T&, int), int curIndex);

    std::shared_ptr<T> try_pop( bool (*func)(const T&, int), int curIndex);


private:
    mutable std::mutex mut;
    std::priority_queue<T, _Container, _Pr> data_queue;
    std::condition_variable data_cond;

};

实现

#include "threadsafe_priority_queue.h"
#include <iomanip>
template<typename T, class _Container, class _Pr >
threadsafe_priority_queue<T, _Container, _Pr>::threadsafe_priority_queue()
{}

template<typename T, class _Container, class _Pr >
threadsafe_priority_queue<T, _Container, _Pr>::~threadsafe_priority_queue()
{
}


template<typename T, class _Container, class _Pr >
void threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop(T& value)  // 2
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });
    value = std::move(data_queue.front());
    data_queue.pop();
    std::cout << "Consumer pop : " << value << std::endl;
    data_cond.notify_all();
}

template<typename T, class _Container, class _Pr >
std::shared_ptr<T> threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop()  // 3
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

template<typename T, class _Container, class _Pr >
bool threadsafe_priority_queue<T, _Container, _Pr>::try_pop(T& value)
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
        return false;
    value = std::move(data_queue.front());
    data_queue.pop();
    data_cond.notify_all();
    return true;
}

template<typename T, class _Container, class _Pr >
std::shared_ptr<T> threadsafe_priority_queue<T, _Container, _Pr>::try_pop()
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
        return std::shared_ptr<T>();  // 5
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

template<typename T, class _Container, class _Pr >
void threadsafe_priority_queue<T, _Container, _Pr>::push(T new_value)
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] {return data_queue.size() < MAX_SIZE; });
    std::cout.flags(std::ios::left);  //左对齐
    std::cout << "Producter push : " << new_value << std::endl;
    data_queue.push(std::move(new_value));

    data_cond.notify_all();  // 1
}

template<typename T, class _Container, class _Pr >
bool threadsafe_priority_queue<T, _Container, _Pr>::empty() const
{
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
}



template<typename T, class _Container, class _Pr >
void threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop(T& value, bool (*func)(const T&, int), int curIndex )
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this, func, curIndex] {
        if (!data_queue.empty() && func(data_queue.top(), curIndex))
        {
            return true;
        }
        if (!data_queue.empty())
            std::cout << "top value: " << data_queue.top() << " ,current index : " << curIndex << std::endl;
        return false;
        });
    value = std::move(data_queue.top());
    data_queue.pop();
   //右对齐
    std::cout << std::setw(100) << setiosflags(std::ios::right)<< "Consumer pop : " << value << std::endl;
    data_cond.notify_all();
}

template<typename T, class _Container, class _Pr >
std::shared_ptr<T> threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop(bool (*func)(const T&, int), int curIndex)
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this, func, curIndex] {
        if (!data_queue.empty() && func(data_queue.front(), curIndex))
        {
            return true;
        }
        return false; 
        });  // 4
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

template<typename T, class _Container, class _Pr >
bool threadsafe_priority_queue<T, _Container, _Pr>::try_pop(T& value, bool (*func)(const T&, int), int curIndex)
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty() || func(data_queue.front(), curIndex))
    {
        return std::shared_ptr<T>();
    }
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}


template<typename T, class _Container, class _Pr >
std::shared_ptr<T>  threadsafe_priority_queue<T, _Container, _Pr>::try_pop(bool (*func)(const T&, int), int curIndex)
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty() || func(data_queue.front(), curIndex))
    {
        return std::shared_ptr<T>();
    }
       
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

生产者

#include "threadsafe_priority_queue.h"
class Producter2
{
public:
	Producter2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue, int i);
	~Producter2();
	void start();

private:
	threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& m_pQueue;
	int index;
};

Producter2::Producter2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue, int i)
	: m_pQueue(queue), index(i)
{

}

Producter2::~Producter2()
{
}


void Producter2::start()
{
	int i = 0;
	while (i < 200)
	{
		//int data = rand();
		m_pQueue.push(i * 3 + index);
		Sleep(30);
		i++;
	}
}

消费者

 #include "threadsafe_priority_queue.h"

class Consumer2
{
public:
	Consumer2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue);
	~Consumer2();
	void start();

private:
	threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& m_pQueue;
	int m_curIndex=-1;
};

Consumer2::Consumer2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue)
	: m_pQueue(queue)
{

}

Consumer2::~Consumer2()
{
}

bool compare( const int & nextIndex,int curindex)
{
	return nextIndex == (curindex + 1);
}

void Consumer2::start()
{
	int i = 0;
	while (i < 200)
	{
		int value;
		m_pQueue.wait_and_pop(value, compare, m_curIndex);
		m_curIndex++;
		Sleep(10);
		i++;
	}
}

main函数


#include <iostream>
#include "Producter.h"
#include "Consumer.h"
# include<thread>
#include"threadsafe_queue.h"
#include"threadsafe_queue.cpp"
#include<windows.h>
#include "threadsafe_priority_queue.h"
#include "threadsafe_priority_queue.cpp"

int main()
{
	threadsafe_priority_queue<int, std::vector<int>, std::greater<int>> queue;
	Consumer2 aonsumer(queue);
	Producter2 producter(queue,0);
	Producter2 producter2(queue, 1);
	Producter2 producter3(queue, 2);
	std::thread tProducter3(&Producter2::start, &producter3);
	std::thread tProducter2(&Producter2::start, &producter2);
	std::thread tProducter1(&Producter2::start, &producter);
	std::thread tConsumer(&Consumer2::start, &aonsumer);
	
	tConsumer.join();
	tProducter3.join();
	tProducter2.join();
	tProducter1.join();
	return 0;

}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/858174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

taro h5列表拖拽排序 --- sortablejs 和 react-sortable-hoc

描述&#xff1a;列表&#xff0c;拖拽排序&#xff0c;只测试了h5 一、sortablejs 文档&#xff1a;http://www.sortablejs.com/ 1.安装sortablejs 2、引入 import Sortable from sortablejs3、页面 const [list, setList] useState([{id: item-1,content: 选项1 }, {id…

什么情况,听说项目经理已经成为行业的最底层了

大家好&#xff0c;我是老原。 要说现在的职场人最关心什么&#xff0c;那肯定是自己所在的这个行业、工种市场行情如何&#xff0c;会不会慢慢没落了&#xff0c;或者被现在盛行的AI、ChatGpt给替代了。 今天这个标题的问题也是我在知乎上看到很多项目经理都在提问的问题之一…

创新引领城市进化:人工智能和大数据塑造智慧城市新面貌

人工智能和大数据等前沿技术正以惊人的速度融入智慧城市的方方面面&#xff0c;为城市的发展注入了强大的智慧和活力。这些技术的应用不仅令城市管理更高效、居民生活更便捷&#xff0c;还为可持续发展和创新奠定了坚实的基础。 在智慧城市中&#xff0c;人工智能技术正成为城市…

网络安全/黑客技术(经验分享)

一、为什么选择网络安全&#xff1f; 这几年随着我国《国家网络空间安全战略》《网络安全法》《网络安全等级保护2.0》等一系列政策/法规/标准的持续落地&#xff0c;网络安全行业地位、薪资随之水涨船高。 未来3-5年&#xff0c;是安全行业的黄金发展期&#xff0c;提前踏入…

WebSocket与消息推送

B/S结构的软件项目中有时客户端需要实时的获得服务器消息&#xff0c;但默认HTTP协议只支持请求响应模式&#xff0c;这样做可以简化Web服务器&#xff0c;减少服务器的负担&#xff0c;加快响应速度&#xff0c;因为服务器不需要与客户端长时间建立一个通信链接&#xff0c;但…

pytest结合 allure 打标记之的详细使用

前言 前面我们提到使用allure 可以生成漂亮的测试报告&#xff0c;下面就Allure 标记我们做详细介绍。 allure 标记 包含&#xff1a;epic&#xff0c;feature, story, title, testcase, issue, description, step, serverity, link, attachment 常用的标记 allure.feature…

汽车租赁小程序制作,如何开发一个微信租车平台“

随着移动互联网的快速发展&#xff0c;小程序已经成为了企业和个人开展业务的重要工具之一。微信小程序作为最为常见和使用最为广泛的一种小程序形式&#xff0c;具有便捷、灵活、高效的特点&#xff0c;受到了广大用户的喜爱。在本文中&#xff0c;我们将手把手教你如何创建一…

vue3使用pinia和pinia-plugin-persist做持久化存储

1、安装依赖 pnpm i pinia // 安装 pinia pnpm i pinia-plugin-persist // 安装持久化存储插件2、main.js引入 import App from ./App.vue const app createApp(App)//pinia import { createPinia } from pinia import piniaPersist from pinia-plugin-persist //持久化插件 …

关于selenium 元素定位的浅度解析

一、By类单一属性定位 元素名称 描述 Webdriver API id id属性 driver.find_element(By.ID, "id属性值") name name属性 driver.find_element(By.NAME, "name属性值") class_name class属性 driver.find_element(By.CLASS_NAME, "class_na…

【计算机网络】应用层协议Http

文章目录 前言URL&#xff08;网址&#xff09;urlencode 和 urldecodeHttp格式方法Http状态码重定向Http常见报头会话保持结语 前言 通过前面的学习&#xff0c;我们已经知道了协议其实就是一种约定&#xff0c;要求双方都能理解对方的消息。应用层上的协议不属于操作系统&am…

七夕送什么礼物最有意义?可以试试潮趣数码好物!

​七夕送什么礼物最有意义&#xff1f;七夕节不想送常见物品&#xff0c;你可以送给他/她潮趣数码好物&#xff0c;不需要多么的贵重&#xff0c;最重要的是你的心意&#xff0c;只要你送的都喜欢。下面来安利几款值得入手的数码好物&#xff01; 推荐一&#xff1a;南卡00压开…

readMe、profile美化。

GitHub的profile美化&#xff0c;第一步&#xff1a;创建与个人账号一致的仓库。然后就会自动展示于个人首页、再新建readme.md文档&#xff0c;添加照片信息即可。&#xff08;动态效果是用svg实现的&#xff09; 一、上波浪svg代码 <svg xmlns"http://www.w3.org/2…

C# WPF 开源主题 HandyControl 的使用(一)

HandyControl是一套WPF控件库&#xff0c;它几乎重写了所有原生样式&#xff0c;同时包含80余款自定义控件&#xff08;正逐步增加&#xff09;&#xff0c;下面我们开始使用。 1、准备 1.1 创建项目 C# WPF应用(.NET Framework)创建项目 1.2 添加包 1.3 在App.xaml中引用…

15-矩阵转置的拓展延伸

&#x1f52e;矩阵的转置✨ 前言 在很多时候我们拿到的数据本身可能并不会把点的坐标按列的方向排列起来&#xff0c;对于我们人类来说&#xff0c;更方便的方式依然是把这个点的坐标按行的方向排列&#xff0c;我们比较熟悉把矩阵看作为一个数据&#xff0c;在这里&#xff0…

6款SSL证书实用工具,全都免费使用!

俗话说“工欲善其事&#xff0c;必先利其器”&#xff0c;SSL证书作为保护网站数据传输安全的重要部分&#xff0c;我们在申请、签发、部署安装SSL证书的时候&#xff0c;可能会涉及到CSR文件生成、获取证书链、转换证书格式等需求&#xff0c;这时候有对应的工具就可提高工作效…

SharePoint 审核和监控工具

审核在顺利的 SharePoint 管理中起着重要作用&#xff0c;尤其是在满足法规遵从性和取证要求方面。为避免数据泄露&#xff0c;必须了解谁来自哪个组访问了哪个文档&#xff0c;以及谁创建或删除了网站或网站集。 审核 SharePoint 服务器 SharePoint采用率的提高导致企业在其…

词性标记:了解使用维特比算法(2/2)

作者&#xff1a;Sachin Malhotra和Divya Godayal 来源&#xff1a; Viterbi Algorithm in Speech Enhancement and HMM 一、说明 早就想写一篇隐马尔可夫&#xff0c;以及维特比算法的文章&#xff1b;如今找了一篇&#xff0c;基本描述出隐马尔可夫的特点。 隐马尔可夫模型&a…

精彩回顾 | D-Day深圳 上海站:高频策略研发再提速

上周末&#xff0c;DolphinDB 分别在上海及深圳成功举办了两场 D-Day 分享会&#xff0c;来自国内头部券商、公募基金以及多家私募机构的数十位核心策略研发、数据分析专家们分享了 DolphinDB 在量化交易各个环节的使用经验&#xff0c;并基于与同类技术栈的优劣势对比&#xf…

java spring cloud 企业电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展 tbms

​ 项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以…

Java多线程(八)

目录 一、产生死锁的情况 1.1 一个线程多把锁 1.1.1 Java中可重入锁的实现机制 1.2 两个线程两把锁 1.3 N个线程M把锁 二、解决死锁的方案 2.1 死锁的必要条件 2.2 破除循环等待 一、产生死锁的情况 死锁是这样一种情形&#xff1a;多个线程同时被阻塞&#xff0c;它们中的一个…