C++11的半同步半异步线程池

news2024/11/25 2:27:37

C++11的半同步半异步线程池

  • 简介
  • 同步队列
    • Take函数
    • Add函数
    • Stop函数
    • SyncQueue完整代码
  • 线程池
  • 主函数测试

简介

半同步半异步线程池用的比较多,实现也比较简单。

其中同步层包括同步服务层和排队层,指的是将接收的任务排队,将所有的任务排队到一个队列中,等待处理;

异步层指多个线程处理任务,异步处理层从同步层取出任务,并发处理任务。

在这里插入图片描述

同步队列

同步队列属于同步层的内容,主要作用是保证队列中共享数据线程安全,同时也提供新增任务的接口,以及提供取任务的接口。

这里使用C++11的锁、条件变量、右值引用、std::move和std::forward来实现。

同步队列主要包括三个函数,Take、Add和Stop。

Take函数

这里实现重载了两个Take函数,可支持一次获取多个任务,或者一次获取一个任务。

//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		list = std::move(m_queue);
		m_notFull.notify_one();
	}
	//获取单个任务
	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}

先创建一个unique *lock 获取 mutex,然后再通过条件变量 m_*notEmpty 来等待判断式。判断式由两个条件组成,一个是停止的标志,另一个是不为空的条件,当不满足任何一个条件时,条件变量会释放 mutex 并将线程置于 waiting 状态,等待其他线程调用 notify_one/notify all 将其唤醒;当满足任何一个条件时,则继续往下执行后面的逻辑,即将队列中的任务取出,并唤醒一个正处于等待状态的添加任务的线程去添加任务。当处于 waiting 状态的线程被 notify_one 或notify all 唤醒时,条件变量会先重新获取 mutex,然后再检查条件是否满足,如果满足,则往下执行,如果不满足,则释放 mutex 继续等待。

Add函数

Add 的过程和 Take 的过程是类似的,也是先获取 mutex,然后检查条件是否满足,不满足条件时,释放 mutex 继续等待,如果满足条件,则将新的任务插入到队列中,并唤醒取任务的线程去取数据。

template<typename F>
	void Add(F &&x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;
		m_queue.emplace_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}

Stop函数

Stop 函数先获取 mutex,然后将停止标志置为 true。注意,为了保证线程安全,这里需要先获取 mutex,在将其标志置为 true 之后,再唤醒所有等待的线,因为等待的条件是m_needStop,并且满足条件,所以线程会继续往下执行。由于线程在 m_needStop 为 true 时会退出,所以所有的等待线程会相继退出。

另外一个值得注意的地方是,我们把 m notFull.notify_all0放到lock_guard 保护范围之外了,这里也可以将 m_notFull.notify all0)放到ockguard保护范围之内,放到外面是为了做一点优化。因为 notify_one 或 notify_all 会唤醒一个在等待的线程,线程被唤醒后会先获取 mutex 再检查条件是否满足,如果这时被 lock guard保护,被唤醒的线程则需要 lock guard 析构释放 mutex 才能获取(即stop函数执行完了才释放)。如果在 lock_guard 之外notify_one 或notify_all,被唤醒的线程获取锁的时候不需要等待 lock_guard 释放锁,性能会好一点,所以在执行 notify_one或notify_all 时不需要加锁保护。

void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}

SyncQueue完整代码

”SyncQueue.h”

同步队列整体代码:

#pragma once
#include <iostream>
#include <list>
#include <mutex>

using namespace std;

template<typename T>
class SyncQueue
{
public:
	SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
	{
	}
	void Put(const T &x)
	{
		Add(x);
	}

	void Put(T &&x)
	{
		Add(std::forward<T>(x));
	}
	//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		list = std::move(m_queue);
		m_notFull.notify_one();
	}
	//获取单个任务
	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
		if (m_needStop)
		{
			return;
		}
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}
	void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}
	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	bool Full()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() == m_maxSize;
	}
	//可以获取任务数量
	int Count()
	{
		return m_queue.size();
	}
private:
	bool NotFull() const
	{
		bool full = m_queue.size() >= m_maxSize;
		if (full)
		{
			cout << "缓冲区满了,需要等待。。。" << endl;
		}
		return !full;
	}
	bool NotEmpty() const
	{
		bool empty = m_queue.empty();
		if (empty)
		{
			cout << "缓冲区空了,需要等待。。。,异步层的线程ID:" << this_thread::get_id() << endl;
		}
		return !empty;
	}
	template<typename F>
	void Add(F &&x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;
		m_queue.emplace_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}
private:
	std::list<T> m_queue; //缓冲区
	std::mutex m_mutex; //互斥量
	std::condition_variable m_notEmpty; //不为空的条件变量
	std::condition_variable m_notFull; //没有满的条件变量
	int m_maxSize; //同步队列最大的size
	bool m_needStop; //停止的标志
};

线程池

“ThreadPool.h”

线程池ThreadPool有3个成员变量,一个是线程组,这个线程组中的线程是预先创建的,应该创建多少个线程由外面传人,一般建议创建 CPU 核数的线程以达到最优的效率,线程组循环从同步队列中取出任务并执行,如果线程池为空,线程组将处于等待状态,等待任务的到来。

另一个成员变量是同步队列,它不仅用来做线程同步,还用来限制同步队列的上限,这个上限也是由使用者设置的。

第三个成员变量是用来停止线程池的,为了保证线程安全,我们用到了原子变量 atomic bool。下一节中将展示使用这个半同步半异步的线程池的实例。

#include<list>
#include<thread>
#include<functional>
#include<memory>
#include<atomic>
#include "SyncQueue.h"

const int MaxTaskCount = 100;
class ThreadPool
{
public:
	using Task = std::function<void()>;
	ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
	{
		Start(numThreads);
	}
	~ThreadPool(void)
	{
		Stop();
	}
	void Stop()
	{
		//保证多线程情况下只调用一次 StopThreadGroup
		std::call_once(m_flag, [this] {StopThreadGroup(); });
	}
	//可输入右值,例如lambda表达式
	void AddTask(Task&& task)
	{
		m_queue.Put(std::forward<Task>(task));
	}
	void AddTask(const Task& task)
	{
		m_queue.Put(task);
	}
	void Start(int numThreads)
	{
		m_running = true;
		//创建线程组
		for (int i = 0; i < numThreads; ++i)
		{
			m_threadgroup.emplace_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
		}
	}
private:
	
	void RunInThread()
	{
		while (m_running)
		{
			//取任务分别执行
			std::list<Task> list;
			m_queue.Take(list);

			for (auto& task : list)
			{
				if (!m_running)
					return;
				task();
			}
		}
	}
	void StopThreadGroup()
	{
		m_queue.Stop(); //让同步队列中的线程停止
		m_running = false; //置为false,让内部线程跳出循环并退出

		for (auto thread : m_threadgroup)
		{
			if (thread)
				thread->join();
		}
		m_threadgroup.clear();

	}
	std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
	SyncQueue<Task> m_queue; //同步队列
	atomic_bool m_running; //是否停止的标志
	std::once_flag m_flag;
};

主函数测试

#include <iostream>
#include "ThreadPool.h"
using namespace std;

void TestThdPool()
{
	ThreadPool pool(2);//创建一个2个线程的线程池

	//创建一个线程来添加10个任务1
	std::thread thd1([&pool] {
		for (int i = 0; i < 10; i++)
		{
			auto thdId = this_thread::get_id();
			pool.AddTask([thdId] {//添加任务可以使用lambda表达式,代码中实现了右值作为参数输入
				cout << "同步线程1的线程ID:" << thdId << endl;
			});
		}
	});
	//创建一个线程来添加20个任务2
	std::thread thd2([&pool] {
		for (int i = 0; i < 20; i++)
		{
			auto thdId = this_thread::get_id();
			pool.AddTask([thdId] {
				cout << "同步线程2的线程ID:" << thdId << endl;
			});
		}
	});

	this_thread::sleep_for(std::chrono::seconds(2));
	getchar();
	pool.Stop();
	thd1.join();
	thd2.join();
}
int main()
{
	TestThdPool();
	return 0;
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1022677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据分享】我国七普的乡镇(街道)人口数据(免费获取)

人口数据是我们在各项研究中都经常使用的数据&#xff01;人口数据的主要来源是人口普查&#xff0c;全国性的人口普查每十年进行一次。最近一次的人口普查是第七次全国人口普查&#xff0c;简称七普。七普统计的是2020年的人口数据。 之前我们分享过省市县三个层级的七普的人…

在业务和IT的视角中,MES管理系统有哪些重要性

在当今制造业中&#xff0c;MES生产管理系统已成为生产管理层面不可或缺的一部分。MES作为一款集成的软件解决方案&#xff0c;旨在连接企业的各个生产环节&#xff0c;实现生产计划、调度、执行和跟踪。本文将从业务和IT视角出发&#xff0c;探讨MES管理系统的重要性和实施细节…

PMP对项目管理工作有什么用?

首先&#xff0c;项目管理岗位基本是不限行业的&#xff0c;所以&#xff0c;只要是项目管理相关的岗位&#xff0c;pmp证书都是能起到效果的&#xff0c;不用担心局限性太大&#xff0c;而且&#xff0c;pmp证书是国际证书&#xff0c;无论国企还是外企&#xff0c;都是认可这…

go net/http 源码解读

回顾 1. HTTP Server 在 go 中启动一个 http server 只需短短几行代码 func PingHandler(w http.ResponseWriter, r *http.Request) {io.WriteString(w, "pong!") }func main() {http.HandleFunc("/ping", PingHandler)log.Fatal(http.ListenAndServe(&…

RocketMQ 源码分析——Producer

文章目录 消息发送代码实现消息发送者启动流程检查配置获得MQ客户端实例启动实例定时任务 Producer 消息发送流程选择队列默认选择队列策略故障延迟机制策略*两种策略的选择 技术亮点:ThreadLocal 消息发送代码实现 下面是一个生产者发送消息的demo&#xff08;同步发送&#…

Linux环境中数据误删除后恢复指导

一、背景 在很多Linux系统运维工作中&#xff0c;很多人会遇到敲错命令&#xff0c;或复制命令出错&#xff0c;或直接执行了rm -rf命令&#xff0c;事后才恍然大悟&#xff0c;闯下大祸&#xff0c;抛开问题&#xff0c;如果真的遇到这种情况&#xff0c;我们该如何应对呢&…

如何使用IP归属地查询API来追踪网络活动

引言 在当今数字化世界中&#xff0c;了解网络活动的源头和位置对于网络安全、市场研究和用户体验至关重要。IP归属地查询API是一种强大的工具&#xff0c;可以帮助您追踪网络活动并获取有关IP地址的重要信息。本文将探讨如何使用IP归属地查询API来追踪网络活动&#xff0c;以…

[C++ 网络协议] 多播与广播

目录 1. 多播 1.1 多播的使用情形 1.2 多播的原理 1.3 如何实现多播 1.4 多播的代码实现 2. 广播 2.1 广播与多播的区别 2.2 广播的分类 2.3 实现广播 1. 多播 1.1 多播的使用情形 考虑一种情形&#xff0c;你要向10000名用户发送数据&#xff0c;此时如果用TCP提供服…

MDPI模板报错的问题---提示缺少sty文件

MDPI模板报错的问题—提示缺少sty文件 平时大多数提交IEEE trans模板时大多使用CTEX编译&#xff0c;然而&#xff0c;MDPI模板需要用texlive&#xff0c;二者之间如果先安装CTEX后安装texlive将会导致库文件的冲突。结果将会报缺少sty的文件错。网上提供了很多解决方案&#…

Nmap安装和使用详解

Nmap安装和使用详解 Nmap概述功能概述运行方式 Nmap安装官方文档参考&#xff1a;Nmap参数详解目标说明主机发现端口扫描Nmap将目标主机端口分成6种状态&#xff1a;Nmap产生结果是基于机器的响应报文&#xff0c;而这些主机可能是不可信任的&#xff0c;会产生一些迷惑或者误导…

如何将 ONLYOFFICE 协作空间与单页面应用集成

2023 年春季&#xff0c;我们推出了 ONLYOFFICE 协作空间&#xff0c;这是一个先进的联合办公平台&#xff0c;旨在加强与客户、合作伙伴和第三方的文档协作。使用可自定义的房间和高级安全功能可以彻底改变您的文档协作方式。在本博文中&#xff0c;我们将以 GitHub Pages 为例…

Linux之操作文件命令

目录 一、阅览文件 1、cat 2、head 3、tail 4、more 5、less 二、过滤命令——grep 1、格式 2、选项 3、匹配模式 三、cut切割命令 1、格式 2、选项 四、sort排序命令 1、格式 2、选项 五、uniq去重命令 六、替换文件中的字符显示tr 1、格式 2、选项 七、…

第8章 MySQL的数据目录

8.1 数据库和文件系统的关系 像 InnoDB 、 MyISAM 这样的存储引擎都是把表存储在磁盘上的&#xff0c;而操作系统用来管理磁盘的又被称为 文件系统 &#xff0c;所以用专业一点的话来表述就是&#xff1a;像 InnoDB 、 MyISAM 这样的存储引擎都是把表存储在文件系统上的。当我…

API接口采集电商平台阿里巴巴中国站获得1688商品评论数据货品评分、评价内容接口调用指南

淘宝API商品评论接口&#xff0c;主要用于获取某个商品的评价信息。通过该接口&#xff0c;我们可以获取到商品的所有评价内容、评价时间、评价等级等相关信息&#xff0c;帮助我们更好地了解用户对商品的反馈&#xff0c;进而进行数据分析和业务优化。 1688.item_review-获得…

投票制作创建流量主小程序开发

很多企业、团队、门店商家有创建投票活动的需求&#xff0c;单独开发一套成本过高&#xff0c;所以会找一些市面上可以创建投票活动的工具。基于此开发了一款可以创建制作投票活动的小程序。 小程序主要是投票活动的制作、创建&#xff0c;可以接入流量主广告和会员功能&#…

成集云 | 金蝶K3集成聚水潭ERP(金蝶K3主管库存)| 解决方案

源系统成集云目标系统 方案介绍 金蝶K3是一款ERP软件&#xff0c;它集成了供应链管理、财务管理、人力资源管理、客户关系管理、办公自动化、商业分析、移动商务、集成接口及行业插件等业务管理组件。以成本管理为目标&#xff0c;计划与流程控制为主线&#xff0c;通过对成本…

温度传感器的精度受什么影响

温度传感器&#xff08;temperature transducer&#xff09;是指能感受温度并转换成可用输出信号的传感器。是早开发&#xff0c;应用极其广的一类传感器&#xff0c;是温度测量仪表的核心部分。现代的温度传感器外形非常得小&#xff0c;这样更加让它广泛应用在生产实践的各个…

PowerDesigner 与 mysql 同步数据

PowerDesigner 连接上数据库 创建数据库表 table_5 选择&#xff1a; 点击确认后弹出 点击run执行 刷新数据库表&#xff0c;已创建成功 修改测试表1&#xff0c;新增一个字段 取消全选 选择数据库&#xff0c;勾选修改的表&#xff0c;如果全部勾选的话&#xff0c;就…

3D动画制作和渲染需要什么样的硬件规格?

动画是艺术与技术的令人兴奋的融合&#xff0c;为无限的创造力提供了广阔的画布。为了将创意愿景变为现实&#xff0c;动画师需要适合其工艺的强大计算资源。每个动画项目都有不同的硬件需求&#xff0c;无论是制作简单的 2D 动画还是构建复杂的 3D 世界。因此&#xff0c;有抱…

字符串 (3)--- KMP 算法的扩展

对于个长度为n的字符串s。定义函数z[i]表示s和s[i,n-1]&#xff08;即以 s[i] 开头的后缀&#xff09;的最长公共前缀&#xff08;LCP&#xff09;的长度。 z被称为s的Z函数。特别地&#xff0c;z[0] 0。 如同大多数字符串主题所介绍的算法&#xff0c;其关键在于&#xff0c…