基于准静态自适应环型缓存器(QSARC)的taskBus万兆吞吐实现

news2025/1/2 2:48:18

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
      • 1. 数据结构
      • 2. 自适应计算队列大小
      • 3. 生产者拼接缓存
      • 4. 高效地通知消费者
    • 小结
      • 1. 性能表现情况
      • 2. 主要改进
      • 3. 源码和发行版

概要

准静态自适应环形缓存器(Quasi-Static Adaptive Ring Cache)是taskBus用于数据吞吐的软件结构。

  • 准静态:缓存器的大小并不是静态分配,而是随着吞吐需求的提高,缓慢增长,并最终适应最高峰时的内存消耗。当缓存器已经达峰后,不再有堆内存分配。
  • 自适应:根据包大小的不同,缓存器在恒定的最大峰值内存容积下,根据统计获得包的大小数据,决定环状缓存队列元素的个数、每个元素的大小。
  • 环形: 队列收尾相接形成环状,生产者、消费者使用两个时钟前后相随,时钟本身采用atomic保护,无需额外的锁。

使用该缓存器,基于增强管道数据流转技术(EPDR)的业余软线无线电平台taskBus可在Linux 系统 i7 6700K 主频 4GHz下达到3GBps(24Gbps)的总交换能力。该交换能力被各个通道均分,共同支撑taskBus平台按照工程的连接关系,把各个生产者产出的数据包及时、完整、有序地输送给消费者。尽管采用了本技术,但与采用内核层面的函数进行管道直接交换的性能极限还差了20倍。

整体架构流程

taskBus的整体架构是一种多进程的合作机制,平台管理N个子进程,各个进程可以充当消费者和生产者的角色。平台收集各个子进程的stdout输出,并按照用户给定的消费关系,送入消费者的stdout.在这样的整体架构下,数据流转流程如下:

stdout
stdin
stdin
stdin
stdout
生产者1
平台 QSARC
消费者1
消费者2
消费者2
生产者2

在202409版本之前,taskBus使用的是消费者队列。由1个生产者产出的数据,会复制N份进入各个消费者的队列。这种情况下,平台既要为生产者部位保留一个用于包拼接的缓存,又要做N次memcpy。在消费者显著多于生产者时,吞吐能力下降的很厉害。

202409版本之后,设计成每份包只有唯一的1个副本,存储在生产者队列。消费者根据索引去拉取:

  • 平台为每个生产者维护1个环状缓存队列。
  • 平台按照消费关系,把每个包的队列位置索引播发给各个消费者。
  • 播发时,会维护一个待消费计数器,将值加1
  • 消费者消费包后,会将计数器的值减1
  • 如果生产者队列绕了一圈,发现下一个位置的计数器不为0,说明消费的速度赶不上生产速度,此时平台不再接受生产者的数据,等待。

整个流程如下图所示(动图):

Queue
注意的是,上图中队列的大小是4个包,即使包消费完毕,可用的容积也不会释放,这样,随着程序的运行,渐渐地动态内存分配会越来越少,速度会越来越快。当然,如果包长是固定的,则不存在此问题,可以提前全部分配。

值得注意的是,在时钟11时,因为下一个位置依旧有消费者在驻留,因此生产被暂停等待。这种架构的缺点是整体的吞吐能力存在“木桶效应”,即由消费最慢的消费者决定1秒内能够流转多少包。

技术名词解释

  • 包:用于一次功能操作的数据单位,可以理解为1段连续内存的数据。包由包头、数据段组成。包头含有长度指示,正常的数据包之间是紧密衔接的。
  • 时钟:用于控制生产、消费的整形变量,从0开始无限增长。每处理1个包,时钟会加1。
  • 当前位置:生产者、消费者操作队列的当前位置,数值= 时钟 % 队列大小。

技术细节

1. 数据结构

生产者为taskNode类型,拥有如下成员变量维护自己的生产队列:

QVector<int> m_status_stdin;
QVector<QByteArray> m_array_stdin;
QVector<qsizetype> m_size_stdin;
QVector<QAtomicInteger<qsizetype> > m_cnt_stdin;

变量1 控制生产的状态。管道到来的数据,是一个无限有序无损流,类似TCP。但是,每次流可能会被切断,导致包头、数据块可能被切割到多次调用里。这个变量用于在各次调用中记忆上次的生产状态。

变量2 就是缓存的内存本身。这个列表的QByteArray元素个数是缓存的包个数,每个QByteArray会保持在历史最大包的高位。这样,只有到来的包大于当前体积时,才会重新分配内存。

变量3 存储缓存内当前的写入位置。如果包已经写完了,则等于包长。如果是空闲,则为0。

变量4 就是待消费计数器,每次生产完毕1个包,会根据消费者等待队列广播包的索引(时钟),并把计数设置为消费者个数。消费者消费完,会减一。

2. 自适应计算队列大小

这种队列存在1个重大的问题,就是很难控制内存的用量。由于各个QByteArray都只增加不释放,因此期望的用量是:

C = M ∗ N C = M * N C=MN

M是最大包大小,N是队列包个数。

由于taskBus设计时,建议包的大小小于64KB,在假设包的种类小于K时,可以预先统计前K个包的最大长度M,从而确定按照最大内存门限,如C=128MB,要设置的N。

N = C / M N = C/M N=C/M

当然,这种算法是极为简陋的。这是建立在taskBus的具体应用场景上的。对包变化幅度很大、无法简单统计的情况,要考虑一定的shink策略,如在每次消费指针归0时,裁剪队列的大小,并释放尾部的内存。

//Adjust buf size
			if (m_pos_stdin==8)
			{
				qsizetype sza = 0;
				for (int ia = 0;ia<8;++ia)
					if (sza < m_size_stdin[ia])
						sza = m_size_stdin[ia];
				if (sza < 32)
					sza = 32;
				m_bufsize_adjust = taskCell::default_ringcache * 1024 * 1024 / sza;
				if (m_bufsize_adjust > m_bufsize_stdin)
					m_bufsize_adjust = m_bufsize_stdin;
				if (m_bufsize_adjust < 8)
					m_bufsize_adjust = 8;
				emit_message(QString("Adjusted buffer ring size : %1 MB / %2 Bytes = %3 frames.").arg(taskCell::default_ringcache).arg(sza).arg(m_bufsize_adjust).toUtf8());
			}

3. 生产者拼接缓存

在当前生产位置上,平台为生产者拼接一个完整的包。这里用到了状态机。状态机拥有如下状态:

状态名取值意义
头部捕获中0正在捕获头部
数据缓存中1正在根据头部指示的状态,缓存数据
数据缓存完毕2包接收完整,触发消费。
数据缓存完毕3触发消费完毕,待回收。

在每个状态上,都会进行进程管道读操作,不断的从stdout获取数据。直到状态2,会触发消费,并在全部消费通知播发后,进入状态3。当下一次生产者访问状态3的队列成员时,如果没有消费者还在消费这个包,则会把包位置归零,状态归零。否则,会阻塞生产者,直到消费者消费完毕为止。

void taskNode::slot_readyReadStandardOutput()
{
	LOG_PROFILE("IO","Start Recieving packs.");
	qsizetype total_sz = m_process->size();
	int badHeader = 0;
	while (total_sz)
	{
		const qsizetype pos = m_pos_stdin % m_bufsize_adjust;
		QByteArray & curr_array = m_array_stdin[pos];
		qsizetype & readBufMax = m_size_stdin[pos];
		QAtomicInteger<qsizetype> & cnt = m_cnt_stdin[pos];
		int & stat = m_status_stdin[pos];
		//生产者被阻塞了,因为下一个缓存位置依旧有消费者在消费数据。
		if (cnt>0)
			break;
		//Old data
		if (stat==3)
		{
			readBufMax = 0;
			stat = 0;
		}
		auto * header =	reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());
		//Header
		if (stat==0)
		{
			if (total_sz<sizeof(TASKBUS::subject_package_header))
				break;
			auto needRead = sizeof(TASKBUS::subject_package_header) - readBufMax ;
			auto red = m_process->read(curr_array.data()+readBufMax,needRead);
			readBufMax += red;
			if (readBufMax == sizeof(TASKBUS::subject_package_header))
			{
				if (header->prefix[0]==0x3C && header->prefix[1]==0x5A &&	header->prefix[2]==0x7E && header->prefix[3]==0x69)
				{
					stat = 1;
				}
				else
				{
					++badHeader;
					readBufMax = 0;
				}
			}
			Q_ASSERT(readBufMax <= sizeof(TASKBUS::subject_package_header));
		}
		//data
		if (stat==1)
		{
			const qsizetype packAllSize = sizeof(TASKBUS::subject_package_header)+header->data_length;
			if (curr_array.size()<packAllSize)
			{
				curr_array.resize(packAllSize);
				header = reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());
			}
			auto needRead = packAllSize - readBufMax ;
			auto red = m_process->read(curr_array.data()+readBufMax,needRead);
			readBufMax += red;
			if (readBufMax==packAllSize)
			{
				stat = 2;
			}
			Q_ASSERT(readBufMax <= packAllSize);
		}

		//Send
		if (stat==2)
		{
			const qsizetype pack_size = sizeof(TASKBUS::subject_package_header)+header->data_length;
			extern QAtomicInteger<quint64>  g_totalrev;
			g_totalrev += readBufMax;
			++m_spackage_sent;
			m_sbytes_sent += sizeof(TASKBUS::subject_package_header)+header->data_length;
			if (header->subject_id == TB_SUBJECT_CMD)
			{
				//Command must endwith \0
				const char * pCmd = (const char *)header+sizeof(TASKBUS::subject_package_header);
				QString cmd = QString::fromUtf8(pCmd,header->data_length);
				QMap<QString, QVariant> map_z = taskCell::string_to_map(cmd);
				//remember uuid
				if (map_z.contains("source"))
				{
					if(m_uuid.size()==0 )
						m_uuid = map_z["source"].toString();
					if (map_z.contains("destin"))
						emit sig_new_command(map_z);
				}
			}
			else if (m_currPrj)
				m_currPrj->routing_new_package(this,pos);
			if (m_bDebug)
				log_package(true,(char *)header,pack_size);
			stat = 3;
			++m_pos_stdin;
			//Adjust buf size
			if (m_pos_stdin==8)
			{
				qsizetype sza = 0;
				for (int ia = 0;ia<8;++ia)
					if (sza < m_size_stdin[ia])
						sza = m_size_stdin[ia];
				if (sza < 32)
					sza = 32;
				m_bufsize_adjust = taskCell::default_ringcache * 1024 * 1024 / sza;
				if (m_bufsize_adjust > m_bufsize_stdin)
					m_bufsize_adjust = m_bufsize_stdin;
				if (m_bufsize_adjust < 8)
					m_bufsize_adjust = 8;
				emit_message(QString("Adjusted buffer ring size : %1 MB / %2 Bytes = %3 frames.").arg(taskCell::default_ringcache).arg(sza).arg(m_bufsize_adjust).toUtf8());
			}
		}
		total_sz = m_process->size();
	}

	if (badHeader)
		emit_message(QByteArray("Error header recieved. "
								"Header must be 0x3C, 0x5A, 0x7E,"
								" 0x69. Aborting."));
	LOG_PROFILE("IO","End Recieving packs.");
}

4. 高效地通知消费者

如果每个包很小,则QEvent触发的密度会很大,开销很大。我们设置一个消费者的索引队列,存储待消费的生产者队列、索引:

	QMutex m_mtx_queue;
	QList<taskNode *> m_write_queue;
	QList<qsizetype> m_write_pos;

而后,只在队列为0时,触发Event。

bool taskNode::enqueue_write(taskNode * node, qsizetype pos)
{
	m_mtx_queue.lock();
	int z = m_write_queue.size() + m_write_cmd.size();
	m_write_queue.push_back( node);
	m_write_pos.push_back(pos);
	m_mtx_queue.unlock();
	if (!z)
	{
		QCoreApplication::postEvent(this,new QEvent(m_nPackEvent));
	}
	return  true;
}

同时,在消费时,一次性获取队列,并清空。这样减少锁的碰撞。

void taskNode::flush_write()
{
	m_mtx_queue.lock();
	QList<taskNode *> write_queue = m_write_queue;
	QList<qsizetype> write_pos = m_write_pos;
	QByteArrayList write_cmd = m_write_cmd;
	m_write_queue.clear();
	m_write_pos.clear();
	m_write_cmd.clear();
	//qDebug()<<write_queue.size();
	m_mtx_queue.unlock();

	while (write_queue.size())
	{
		taskNode * node = write_queue.first();
		qsizetype pos = write_pos.first();

		write_queue.pop_front();
		write_pos.pop_front();

		QByteArray & arr = node->get_stdin_array(pos);
		m_process->write(arr.constData(),sz);
		--cnt;
	}

小结

尽管采用了环形队列,由于在QProcess上还是存在mem-alloc,这使得峰值状态下CPU占用还是很大的。整体速率距离PCI总线和DDR4的能力还相去甚远,即使和bash直接管道连接相比,也有不小的差距。不过,为了构造灵活的管道吞吐能力,允许数据被多对多流转和反馈回环,损失一些性能也差强人意。

1. 性能表现情况

通过上述操作,taskBus的吞吐能力得到了保证,在只使用用户态的内存操作情况下,缓存64MB时,可以获得10Gbps以上的性能。在Linux下,可达 24Gbps,即3GBps1

平台系统峰值吞吐单路流量平均来回延迟
i7-10700U1Linux x643354MBps1340MBps1ms
i7-6700KLinux x642844MBps1050MBps2.2ms
i7-10700U2win10 home x641561MBps400MBps22ms
i7-6700Kwin10 home x641345MBps340MBps40ms
RaspberryPi 4(8GB)Rasbain 64263MBps102MBps6ms

上表是运行双进程点对点双向PING的状态下达到的。多进程下,总速率会稍高。可以发现,同样的硬件配置下:

  • windows下taskBus的吞吐能力要比Linux少1倍
  • windows下taskBus的带宽利用率要低于Linux,总速率/单路速率Linux更优。
  • windows下的延迟更大。

这是非常奇怪的现象,讲起来windows应该更快才对。对于里面的细节原因,只有后面慢慢研究了。

2. 主要改进

与2024年8月版本相比,少了一层生产者–>消费者的memcpy,转而只是传递int类型的索引,在生产者:消费者个数=1:N的情况下,吞吐能力会得到较大提高。这种memcpy次数的降低,对于老旧CPU影响更大,即使在2进程互PING(1:1)的测试中,也能达到 20-30%的提速。

3. 源码和发行版

源码和发行版参考

GitCode.net

或者

GitCode.com


  1. i7-10700U是一个笔记本上的低功耗cpu,在最大睿频4GHz下的Manjaro Linux系统上,3GBps持续了5秒。由于温度上升,温度墙导致频率下降到1.8GHz,速度降低1倍。 ↩︎ ↩︎

  2. i7-10700U是一个笔记本上的低功耗cpu,通过在windows-10下去除温度墙,可以在97摄氏度的高温状态下,保持在4GHz,维持1.5GBps的流量。 ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2116354.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python报错已解决】 No Python at ‘C:\Users...\Python Python39\python.exe’

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 前言一、问题描述1.1 报错示例1.2 报错分析1.3 解决思路 二、解决方法2.1 方法一&#xff1a;检查Python安装路径2.2 …

苍穹外卖随记(一)

黑马苍穹外卖逻辑和细节的问题和解决 1.后端查询到员工的日期信息&#xff0c;将信息进行json化传给前端时发生&#xff1a;前端收到的是不标准的日期json串。 解决&#xff1a;1.注解进行json格式化&#xff08;JsonFormat&#xff09;2. 在spring MVC中&#xff0c;通过消息…

如何验证VMWare WorkStation的安装?

如何验证VMWare WorkStation的安装&#xff1f; 右击"网络"&#xff0c;点击 打开"网络和Internet设置"&#xff0c;点击更改适配器选项&#xff0c;如果出现VMNet1和VMNet8&#xff0c;则说明安装成功。

内网穿透(cpolar实现)

目录 一、介绍 二、内网穿透工具cpolar实现 1.下载与安装 2.cpolar指定authtoken 3.获取临时域名 4.验证临时域名有效性 一、介绍 内网穿透&#xff0c;即 NAT 穿透&#xff08;Network Address Translation Traversal&#xff09;&#xff0c;是一种网络技术&#xff0…

Python爬虫使用实例-wallpaper

1/ 排雷避坑 &#x1f95d; 中文乱码问题 print(requests.get(urlurl,headersheaders).text)出现中文乱码 原因分析&#xff1a; <meta charset"gbk" />解决方法&#xff1a; 法一&#xff1a; response requests.get(urlurl,headersheaders) response.en…

java基础-IO(4)管道流 PipedOutputStream、PipedInputStream、PipedReader、PipedWriter

管道 提到管道&#xff0c;第一印象就是水管或者地下管道&#xff0c;一节接着一节&#xff0c;形如下图。 管道流 "流"从上一个管道 ——-> 下一个管道。又细分为字节流和字符流。 字节流&#xff08;PipedOutputStream、PipedInputStream&#xff09; 如果…

SSM框架介绍

SSM通常指的是三个开源框架的组合&#xff0c;即Spring、SpringMVC&#xff08;Spring Web MVC&#xff09;和MyBatis&#xff0c;这三个框架经常一起使用来开发Java企业级应用&#xff0c;特别是在Web应用开发中非常流行。 SSM框架介绍 Spring 简介&#xff1a;Spring是一个…

谷粒商城のNginx

文章目录 前言一、Nginx1、安装Nginx2、相关配置2.1、配置host2.2、配置Nginx2.3、配置网关 前言 本篇重点介绍项目中的Nginx配置。 一、Nginx 1、安装Nginx 首先需要在本地虚拟机执行&#xff1a; mkdir -p /mydata/nginx/html /mydata/nginx/logs /mydata/nginx/conf在项目…

ModuleNotFoundError: No module named ‘mmcv.transforms‘

不得已的解决方法&#xff1a; mmcv升级到2.0.0即可解决 升级后自然又面临一系列不兼容问题&#xff01; 官方文档查漏补缺

【STM32】呼吸灯实现

对应pwm概念可以去看我的博客51实现的呼吸灯 根据对应图我们可知预分频系数为999&#xff0c;重装载值为2000&#xff0c;因为设置内部时钟晶振频率为100MHZ &#xff0c;1s跳 100 000000次 &#xff0c;跳一次需要1/100 000000s 20ms0.02s 对应跳的次数为 我们使用通用定时器…

九,自定义转换器详细操作(附+详细源码解析)

九&#xff0c;自定义转换器详细操作&#xff08;附详细源码解析&#xff09; 文章目录 九&#xff0c;自定义转换器详细操作&#xff08;附详细源码解析&#xff09;1. 基本介绍2. 准备工作3. 自定义转换器操作4. 自定义转换器的注意事项和细节5. 总结&#xff1a;6. 最后&…

【前端学习】AntV G6-07 深入图形与图形分组、自定义节点、节点动画(上、中)

课程链接 AntV G6&#xff1a;深入图形与图形分组、自定义节点、节点动画&#xff08;上&#xff09;_哔哩哔哩_bilibili AntV G6&#xff1a;深入图形与图形分组、自定义节点、节点动画&#xff08;中&#xff09;_哔哩哔哩_bilibili 图形分组 Group | G6 (antgroup.com) 自…

ARM32开发——DMA内存到内存

&#x1f3ac; 秋野酱&#xff1a;《个人主页》 &#x1f525; 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 需求数据交互流程开发流程依赖引入DMA初始DMA传输请求完整代码 关心的内容DMA初始化DMA初始化DMA数据传输请求完整代码 DMA中断开启…

封装智能指针 qt实现登录界面

1.封装独占智能指针——unique_ptr #include <iostream> #include <utility> // For std::move// 命名空间 namespace custom_memory { template <typename T> class myPtr { public:// 使用初始化列表进行初始化explicit myPtr(T* p nullptr) noexcept : …

卡西莫多的诗文集2022-2024.9月6-校庆国庆专版定版

通过网盘分享的文件&#xff1a;卡西莫多的诗文集2022-2024.9月6-A5-校庆国庆专版-定版.pdf 链接: https://pan.baidu.com/s/1cpFK5k1baGXbSGxY30GL_A?pwdjgnt 提取码: jgnt 卡西莫多的诗文集2022-2024.9月6-校庆国庆专版&#xff0c;又稍作修改并勘误了一些错字&#xff0c;…

AWQ量化(Activation-aware Weight Quantization)

论文&#xff1a; AWQ: Activation-aware Weight Quantization for On-Device LLM Compression and Acceleration 中文解读&#xff1a; 深入理解AWQ量化技术 - 知乎 (zhihu.com) 动机&#xff1a;端侧设备用LLM&#xff0c;为了减少显存占用量&#xff0c;所以要用INT4量化&am…

【Jupyter Notebook】汉化

1.打开:Anaconda Prompt 2.输入:"activate Zhui01"(注意&#xff1a;Zhui01是刚创建的环境名字) activate Zhui01 3.输入:"pip install jupyterlab-language-pack-zh-CN" pip install jupyterlab-language-pack-zh-CN 4.打开:Jupyter Notebook 5.点击&q…

Mysql高级篇(中)——七种常见的 join 查询图

注意&#xff1a; MySQL是不支持 FULL OUTER JOIN 这种语法的&#xff0c;因此要实现图中 6、7的查询结果&#xff0c;可以使用 UNION 关键字结合 LEFT JOIN、RIGHT JOIN 实现&#xff0c;UNION可以实现去重的效果&#xff1b; 参考如下代码&#xff1a; -- MySQL中 图标6 的实…

用Unity2D制作一个人物,实现移动、跳起、人物静止和动起来时的动画:中(人物移动、跳起、静止动作)

上回我们学到创建一个地形和一个人物&#xff0c;今天我们实现一下人物实现移动和跳起&#xff0c;依次点击&#xff0c;我们准备创建一个C#文件 创建好我们点击进去&#xff0c;就会跳转到我们的Vision Studio&#xff0c;然后输入这些代码 using UnityEngine;public class M…

MarginNote 4 激活码 - 苹果电脑(Mac)全能型的阅读工具

对广大求学者来说&#xff0c;如何科学有效地掌握知识&#xff0c;一定是大家不懈追求的方向。 众多求学者使用的&#xff0c;深度阅读学习工具 MarginNote&#xff0c;终于为 Mac 推出了官网 4.0 版。带来了众多创新的学习辅助工具。 MarginNote 4 功能简介 1、快速理清各版…