C++多线程学习09:并发队列

news2024/12/24 14:17:53

参考

链接:恋恋风辰官方博客

并发队列&线程安全栈

代码结构:

并发队列ThreadSafeQueue.h:

#pragma once

#include <mutex>
#include <queue>

template<typename T>
class threadsafe_queue
{
private:

	mutable std::mutex mut;
	std::queue<T> data_queue;
	std::condition_variable data_cond;

public:
	threadsafe_queue(){}

	void push(T new_value)
	{
		std::lock_guard<std::mutex> lk(mut);
		data_queue.push(std::move(new_value));
		data_cond.notify_one();    //⇽-- - ①
	}

	void wait_and_pop(T& value)    //⇽-- - ②
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty(); });
		value = std::move(data_queue.front());
		data_queue.pop();
	}

	std::shared_ptr<T> wait_and_pop()   // ⇽-- - ③
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty(); });   // ⇽-- - ④
		std::shared_ptr<T> res(
			std::make_shared<T>(std::move(data_queue.front())));
		data_queue.pop();
		return res;
	}

	bool try_pop(T& value)
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return false;
		value = std::move(data_queue.front());
		data_queue.pop();
		return true;
	}

	std::shared_ptr<T> try_pop()
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return std::shared_ptr<T>();    //⇽-- - ⑤
		std::shared_ptr<T> res(
			std::make_shared<T>(std::move(data_queue.front())));
		data_queue.pop();
		return res;
	}

	bool empty() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.empty();
	}
};

template<typename T>
class threadsafe_queue_ptr
{
private:
	mutable std::mutex mut;
	std::queue<std::shared_ptr<T>> data_queue;
	std::condition_variable data_cond;
public:
	threadsafe_queue_ptr()
	{}
	void wait_and_pop(T& value)
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty(); });
		value = std::move(*data_queue.front());    //⇽-- - ①
		data_queue.pop();
	}
	bool try_pop(T& value)
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return false;
		value = std::move(*data_queue.front());   // ⇽-- - ②
		data_queue.pop();
		return true;
	}
	std::shared_ptr<T> wait_and_pop()
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty(); });
		std::shared_ptr<T> res = data_queue.front();   // ⇽-- - ③
		data_queue.pop();
		return res;
	}
	std::shared_ptr<T> try_pop()
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return std::shared_ptr<T>();
		std::shared_ptr<T> res = data_queue.front();   // ⇽-- - ④
		data_queue.pop();
		return res;
	}
	void push(T new_value)
	{
		std::shared_ptr<T> data(
			std::make_shared<T>(std::move(new_value)));   // ⇽-- - ⑤
		std::lock_guard<std::mutex> lk(mut);
		data_queue.push(data);
		data_cond.notify_one();
	}
	bool empty() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.empty();
	}
};

template<typename T>
class threadsafe_queue_ht
{
private:
	struct node
	{
		std::shared_ptr<T> data;
		std::unique_ptr<node> next;
	};

	std::mutex head_mutex;
	std::unique_ptr<node> head;
	std::mutex tail_mutex;
	node* tail;
	std::condition_variable data_cond;

	node* get_tail()
	{
		std::lock_guard<std::mutex> tail_lock(tail_mutex);
		return tail;
	}
	std::unique_ptr<node> pop_head()
	{
		std::unique_ptr<node> old_head = std::move(head);
		head = std::move(old_head->next);
		return old_head;
	}

	std::unique_lock<std::mutex> wait_for_data()
	{
		std::unique_lock<std::mutex> head_lock(head_mutex);
		data_cond.wait(head_lock, [&] {return head.get() != get_tail(); });
		return std::move(head_lock);
	}

	std::unique_ptr<node> wait_pop_head()
	{
		std::unique_lock<std::mutex> head_lock(wait_for_data());
		return pop_head();
	}

	std::unique_ptr<node> wait_pop_head(T& value)
	{
		std::unique_lock<std::mutex> head_lock(wait_for_data());
		value = std::move(*head->data);
		return pop_head();
	}

	std::unique_ptr<node> try_pop_head()
	{
		std::lock_guard<std::mutex> head_lock(head_mutex);
		if (head.get() == get_tail())
		{
			return std::unique_ptr<node>();
		}
		return pop_head();
	}

	std::unique_ptr<node> try_pop_head(T& value)
	{
		std::lock_guard<std::mutex> head_lock(head_mutex);
		if (head.get() == get_tail())
		{
			return std::unique_ptr<node>();
		}
		value = std::move(*head->data);
		return pop_head();
	}

public:
	threadsafe_queue_ht() :  // ⇽-- - 1
		head(new node), tail(head.get()){}

	threadsafe_queue_ht(const threadsafe_queue_ht& other) = delete;
	threadsafe_queue_ht& operator=(const threadsafe_queue_ht& other) = delete;

	std::shared_ptr<T> wait_and_pop() //  <------3
	{
		std::unique_ptr<node> const old_head = wait_pop_head();
		return old_head->data;
	}

	void wait_and_pop(T& value)  //  <------4
	{
		std::unique_ptr<node> const old_head = wait_pop_head(value);
	}

	std::shared_ptr<T> try_pop()
	{
		std::unique_ptr<node> old_head = try_pop_head();
		return old_head ? old_head->data : std::shared_ptr<T>();
	}

	bool try_pop(T& value)
	{
		std::unique_ptr<node> const old_head = try_pop_head(value);
		if (old_head) {
			return true;
		}
		return false;
	}
	bool empty()
	{
		std::lock_guard<std::mutex> head_lock(head_mutex);
		return (head.get() == get_tail());
	}

	void push(T new_value) //<------2
	{
		std::shared_ptr<T> new_data(
			std::make_shared<T>(std::move(new_value)));

		std::unique_ptr<node> p(new node);
		{
			std::lock_guard<std::mutex> tail_lock(tail_mutex);
			tail->data = new_data;
			node* const new_tail = p.get();
			tail->next = std::move(p);
			tail = new_tail;
		}

		data_cond.notify_one();
	}
};

线程安全栈ThreadSafeStack.h : 

#pragma once

#include <exception>
#include <mutex>
#include <stack>
#include <condition_variable>

struct empty_stack : std::exception
{
    const char* what() const throw();
};

template<typename T>
class threadsafe_stack
{
private:
    std::stack<T> data;
    mutable std::mutex m;
public:
    threadsafe_stack() {}

    threadsafe_stack(const threadsafe_stack& other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;
    }

    threadsafe_stack& operator=(const threadsafe_stack&) = delete;

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lock(m);
        data.push(std::move(new_value));    // ⇽-- - ①
    }

    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();  //  ⇽-- - ②
        std::shared_ptr<T> const res(
            std::make_shared<T>(std::move(data.top())));   // ⇽-- - ③
        data.pop();   // ⇽-- - ④
        return res;
    }

    void pop(T& value)
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        value = std::move(data.top());   // ⇽-- - ⑤
        data.pop();   // ⇽-- - ⑥
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }
};


template<typename  T>
class threadsafe_stack_waitable
{
private:
    std::stack<T> data;
    mutable std::mutex m;
    std::condition_variable cv;
public:
    threadsafe_stack_waitable() {}

    threadsafe_stack_waitable(const threadsafe_stack_waitable& other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;
    }

    threadsafe_stack_waitable& operator=(const threadsafe_stack_waitable&) = delete;

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lock(m);
        data.push(std::move(new_value));    // ⇽-- - ①
        cv.notify_one();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [this]()   //  ⇽-- - ②
            {
                if (data.empty())
                {
                    return false;
                }
                return true;
            });


        std::shared_ptr<T> const res(
            std::make_shared<T>(std::move(data.top())));   // ⇽-- - ③
        data.pop();   // ⇽-- - ④
        return res;
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [this]()
            {
                if (data.empty())
                {
                    return false;
                }
                return true;
            });

        value = std::move(data.top());   // ⇽-- - ⑤
        data.pop();   // ⇽-- - ⑥
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
        {
            return false;
        }

        value = std::move(data.top());
        data.pop();
        return true;
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
        {
            return std::shared_ptr<T>();
        }

        std::shared_ptr<T> res(std::make_shared<T>(std::move(data.top())));
        data.pop();
        return res;
    }
};

实现一个测试类TestClass.h :

#pragma once

#include<iostream>

class TestClass
{
public:
	TestClass(int data) :_data(data) {}
	TestClass(const TestClass& mc) :_data(mc._data) {}
	TestClass(TestClass&& mc) :_data(mc._data){}

	friend std::ostream& operator << (std::ostream& os, const TestClass& mc)
	{
		os << mc._data;
		return os;
	}

private:
	int _data;
};

主函数负责测试代码:

#include <iostream>
#include <thread>
#include <mutex>
#include "ThreadSafeQueue.h"
#include "ThreadSafeStack.h"
#include "TestClass.h"

std::mutex mtx_cout;
void PrintTestClass(std::string consumer, std::shared_ptr<TestClass> data)
{
	std::lock_guard<std::mutex> lock(mtx_cout);
	std::cout << consumer << " pop data success , data is " << (*data) << std::endl;
}

void TestThreadSafeStack()
{
	threadsafe_stack_waitable<TestClass> stack;

	std::thread consumer1(
		[&]()
		{
			for (;;)
			{
				std::shared_ptr<TestClass> data = stack.wait_and_pop();
				PrintTestClass("consumer1", data);
			}
		}
	);

	std::thread consumer2([&]()
		{
			for (;;)
			{
				std::shared_ptr<TestClass> data = stack.wait_and_pop();
				PrintTestClass("consumer2", data);
			}
		});

	std::thread producer([&]()
		{
			for (int i = 0; i < 100; i++)
			{
				TestClass mc(i);
				stack.push(std::move(mc));
			}
		});

	consumer1.join();
	consumer2.join();
	producer.join();
}

void TestThreadSafeQue()
{
	threadsafe_queue_ptr<TestClass> safe_que;
	std::thread consumer1(
		[&]()
		{
			for (;;)
			{
				std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
				PrintTestClass("consumer1", data);
			}
		}
	);

	std::thread consumer2([&]()
		{
			for (;;)
			{
				std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
				PrintTestClass("consumer2", data);
			}
		});

	std::thread producer([&]()
		{
			for (int i = 0; i < 100; i++)
			{
				TestClass mc(i);
				safe_que.push(std::move(mc));
			}
		});

	consumer1.join();
	consumer2.join();
	producer.join();
}


void TestThreadSafeQueHt()
{
	threadsafe_queue_ht<TestClass> safe_que;
	std::thread consumer1(
		[&]()
		{
			for (;;)
			{
				std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
				PrintTestClass("consumer1", data);
			}
		}
	);

	std::thread consumer2([&]()
		{
			for (;;)
			{
				std::shared_ptr<TestClass> data = safe_que.wait_and_pop();
				PrintTestClass("consumer2", data);
			}
		});

	std::thread producer([&]()
		{
			for (int i = 0; i < 100; i++)
			{
				TestClass mc(i);
				safe_que.push(std::move(mc));
			}
		});

	consumer1.join();
	consumer2.join();
	producer.join();
}

int main()
{
	/* 1. 测试线程安全栈 */
	TestThreadSafeStack();

	/* 2. 测试线程安全队列1 */
	TestThreadSafeQue();

	/* 3. 测试线程安全队列2 */
	TestThreadSafeQueHt();

    std::cout << "Finished! \n";
}

生产中直接应用即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1472878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解Python中的JSON模块:基础大总结与实战代码解析【第102篇—JSON模块】

深入理解Python中的JSON模块&#xff1a;基础大总结与实战代码解析 在Python中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;模块是处理JSON数据的重要工具之一。JSON是一种轻量级的数据交换格式&#xff0c;广泛应用于Web开发、API通信等领域。本文将…

linux操作系统期末练习题

背景&#xff1a; 一、远程登录 1&#xff0e;利用远程登录软件&#xff0c;以用户userManager(密码123456)&#xff0c;远程登录教师计算机&#xff08;考试现场给出IP地址&#xff09;&#xff0c;只有操作&#xff0c;没有命令。 2&#xff0e;以stu班级学生个人学号后3位…

goland配置新增文件头

参考&#xff1a; goland函数注释生成插件 goland函数注释生成插件_goland自动加函数说明-CSDN博客 GoLand 快速添加方法注释 GoLand 快速添加方法注释_goland批量注释-CSDN博客 goland 如何设置头注释&#xff0c;自定义author和data goland 如何设置头注释&#xff0c;自定…

spring boot 集成科大讯飞星火认知大模型

一、安装依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/…

springboot003图书个性化推荐系统的设计与实现(源码+调试+LW)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于SpringBoot的图书个…

SOLIDWORKS 查找并修复装配体配合错误

我们在SOLIDWORKS 正版软件进行装配体装配时&#xff0c;时常会出现一些报错&#xff0c;例如在配合、装配体特征或被装配体参考引用的零部件和子装配体中。一些常见的错误&#xff0c;如一个零部件的过定义会引发更多其他错误信息&#xff0c;并导致装配体停止解析配合关系。下…

RestTemplate启动问题解决

⭐ 作者简介&#xff1a;码上言 ⭐ 代表教程&#xff1a;Spring Boot vue-element 开发个人博客项目实战教程 ⭐专栏内容&#xff1a;个人博客系统 ⭐我的文档网站&#xff1a;http://xyhwh-nav.cn/ RestTemplate启动问题解决 问题&#xff1a;在SpringCloud架构项目中配…

汽车大灯尾灯划痕裂缝破洞破损掉角崩角等如何修复?根本没必要换车灯换总成,使用无痕修UV树脂胶液即可轻松搞定。

TADHE车灯无痕修复专用UV胶是一种经过处理的UV树脂胶&#xff0c;主要成份是改性丙烯酸UV树脂。应用在车灯的专业无痕修复领域。 车灯修复UV树脂有以下优点&#xff1a; 1. 快速修复&#xff1a;此UV树脂是一种用UV光照射在10秒内固化的材料。 2. 高强度&#xff1a;UV树脂固…

【npm下载包报错:CERT_HAS_EXPIRED,问题解决】

npm下载包报错&#xff1a;CERT_HAS_EXPIRED npm安装依赖的时候出现报错 根据第三行报错的提示得知报错原因是证书已过期 上网一查&#xff0c;原来常用的淘宝镜像早就换新域名了&#xff0c; 之前的镜像域名在2024年1月22日https证书到期了 替换为最新的地址就可以了 npm …

蛋白结构预测模型评价指标

欢迎浏览我的CSND博客&#xff01; Blockbuater_drug …点击进入 文章目录 前言一、蛋白结构预测模型评价指标TM-scorelDDT 二、Alphafold中的评价指标pLDDTpTMPAE 三、AlphaFold-multimer 蛋白结构的评价指标DockQipTM 总结参考资料 前言 本文汇总了AlphaFold和AlphaFold-mul…

线性表——单链表的增删查改(下)

本节继续上节未完成的链表增删查改接口的实现。这是上节的地址:线性表——单链表的增删查改&#xff08;上&#xff09;-CSDN博客 上节实现的接口如下&#xff1a; //申请链表节点函数接口 SLNode* BuySListNode(SLTDataType x); //单链表的打印函数接口 void SListPrint(SLNod…

探索比特币现货 ETF 对加密货币价格的潜在影响

撰文&#xff1a;Sean&#xff0c;Techub News 文章来源Techub News&#xff0c;搜Tehub News下载查看更多Web3资讯。 自美国比特币现货交易所交易基金&#xff08;ETF&#xff09;上市以来&#xff0c;比特币现货 ETF 的相关信息无疑成为了影响比特币价格及加密货币市场走向…

《Docker 简易速速上手小册》第10章 朝着 Docker Swarm 和 Kubernetes 迈进(2024 最新版)

文章目录 10.1 Docker Swarm 基础10.1.1 重点基础知识10.1.2 重点案例&#xff1a;Python Web 应用的 Docker Swarm 部署10.1.3 拓展案例 1&#xff1a;微服务架构的 Docker Swarm 部署10.1.4 拓展案例 2&#xff1a;使用 Docker Swarm 进行持续部署 10.2 Kubernetes 与 Docker…

nginx 从$http_x_forwarded_for 中获取第一个参数

在 Nginx 中&#xff0c;$http_x_forwarded_for 变量通常包含了客户端的原始 IP 地址以及可能经过的代理服务器的 IP 地址列表&#xff0c;这些地址由逗号分隔。如果你想从 $http_x_forwarded_for 中截取第一个参数&#xff08;即最左边的 IP 地址&#xff09;&#xff0c;你可…

C语言中的套娃——函数递归

目录 一、什么是递归 1.1.递归的思想 1.2.递归的限制条件 二、举例体会 2.1.求n的阶乘 2.2.顺序打印整数的每一位 2.3.斐波那契数列 三、递归与迭代 一、什么是递归 在学习C语言的过程中&#xff0c;我们经常会跟递归打交道&#xff0c;什么是递归呢&#xff1f;它其实…

用于自监督视觉预训练的屏蔽特征预测

Masked Feature Prediction for Self-Supervised Visual Pre-Training 一、摘要 提出了用于视频模型自监督预训练的掩模特征预测&#xff08;MaskFeat&#xff09;。首先随机屏蔽输入序列的一部分&#xff0c;然后预测屏蔽区域的特征。研究了五种不同类型的特征&#xff0c;发…

vue3 + TS + vite 搭建中后台管理系统(开箱即用)

[TOC](vue3 TS vite 搭建中后台管理系统) 开箱即用 前言 要成功&#xff0c;先发疯&#xff0c;头脑简单往前冲&#xff01; 三金四银&#xff0c;金九银十&#xff0c;多学知识&#xff0c;也不能埋头苦干&#xff0c;要成功&#xff0c;先发疯&#xff0c;头脑简单往前冲…

Java计划线程池ScheduledThreadPoolExecutor运行流程和源码分析

1. 计划线程池ScheduledThreadPoolExecutor简介 ScheduledThreadPoolExecutor继承自线程池ThreadPoolExecutor&#xff0c;并在其基础上增加了按时间调度执行任务的功能&#xff0c;如果对ThreadPoolExecutor还不是很熟悉&#xff0c;可以阅读一下这篇文章&#xff1a; Java线…

成都东部新区文化旅游体育局莅临国际数字影像产业园参观入驻企业,共促政产交流“零距离”

2月23日&#xff0c;成都东部新区文化旅游体育局投服处处长田东一行莅临国际数字影像产业园考察交流&#xff0c;树莓科技&#xff08;成都&#xff09;集团有限公司副总裁吴晓平、行政运营经理郭宇风、国际数字影像产业园项目负责人万里全程接待。 吴晓平副总带领田东处长一行…

开发知识点-.netC#图形用户界面开发之WPF

C#图形用户界面开发 框架简介WinForms(Windows Forms):WPF(Windows Presentation Foundation):UWP(Universal Windows Platform):MAUI(Multi-platform App UI):选择控件参考文章随笔分类 - WPF入门基础教程系列基于C#语言的GUI开发,主要介绍WPF框架