现场工程师宝典-流式处理的异常现象以及提高吞吐能力的常见优化策略

news2025/1/17 6:00:00

流式处理区别于按包处理,指的是对处理者而言,面对的是逻辑上无头无尾的数据流。因此,在提取数据流中的包时,就需要遵循其内在的格式,进行头部捕获、提取、校验。然而,如果不考虑TCP等流式数据的异常情况,而简单的编程,会在性能、稳定性上踩坑。本人作为资深现场工程师,协助开发团队处理过各种千奇百怪的流式处理问题,本文把其中较为常见的一并总结一下,希望对避坑有所帮助。

1. 流式传输的常见异常

1.1 普通断线

受到物理层以及各级底层协议栈的影响,流式处理可能发生中断。复现:拔除网线、串口等电缆,或者关闭电源。此时,主对方的吞吐都会中断,只能再次发起连接。

端1
端2

相对于下面几个异常,普通的断线是最容易检测处理的。只需要重新发起连接即可。

1.2 黏包断线

一般的流式处理,都会有策略区别两次完整、独立的流。比如两次TCP连接,他们的五元组是不同的,源端口一般会发生变化。此外,各类流式协议内层的ID等参数在重连后也会变化,所以即使断线,再次连接后,也是崭新的流。以前的流中的片段不会混进来。然而,流传输的环境是错综复杂的,比如存在一些特殊的代理以及中继。考虑下面的情况:

正常
端1
代理
端2

代理方在端1发起连接时,会主动连接端2,并协助周转主数据。
如果代理方在检测到端1断开后,依旧保持与端2的连接,则设计不佳的代理、端2可能不会区分前后两个连接的流。

这种情况,导致端2收到的数据异常:

  1. 截断。端1中断前的流并没有传输完毕,停止在一个中间状态。
  2. 错位。端2在上层协议的包长度、包头解析时,发生错位。
  3. 校验失败。端2的上层协议把上一次端1的末包头和本次会话的新数据混合,导致看似正确的包结构,但校验失败。
  4. 系统崩溃。如果上层协议不检测完整性,而是默认数据是正确的,则可能发生崩溃。

1.3 错包混入

曾有极少实现欠佳的设备个例,出现错包。这种错包理论上是不可能发生的,因为流协议的校验会保证单次会话的完整性。但是,由于设备本身对协议栈实现的缺陷,导致错包。原因是采用软件设备来实现NAT等功能,直接在底层开展包处理,对地址进行替换。当底层协议并不保证数据的正确性时,替换者又不去检查上层协议的校验,转而直接进行校验生成,导致原本校验不通过的数据竟然校验正确了!

错包
校验正确的错包
端1
NAT代理
端2

基于这种处理的单片机软件设备曾经造成了重大的安全事故。作为设计通用流式处理工具的设计者,有必要考虑到这种极端边界条件的存在。

2. 确保稳定性的原则

确保稳定性的方法就是采用零信任的拆包组包策略。

  1. 流式处理的上层单位需要具备可识别的头部。
  2. 需要有额外校验方式
  3. 无论何时都按照最坏的情况来检测数据的一致性。
  4. 假设下一个包永远不在正确的位置上。
  5. 假设长度字段会错误,成为负数或者超大正整数。
  6. 假设承载的内容会发生错误。
  7. 假设断线会发生在任何一个字节甚至是比特上。

如果按照上述假设编程,基本不会存在稳定性问题。

3.确保性能的原则

既然是流式处理,必然存在缓存的吞吐。在无限长的流中,有限的内存只能以窗口的形式处理数据。

窗口处理

一般来说,窗口大小应该大于头部的大小。用何种数据结构盛放窗口,何种策略滑动窗口,对性能的影响非常大。

确保性能的原则如下:

  1. 尽可能降低动态分配内存、重新调整内存大小的频率。
  2. 使用游标而不是实际的0下标来标定头部。这样可以避免频繁的内存拷贝、搬移。
  3. 较大的缓存窗口有利于提高性能。

4.举例

4.1反面范例-taskBus 旧版管道吞吐

我们首先看一个反面教材。这个教材是由初学者搭建的,一直没有优化过。因为流速很慢,所以也能正常工作。

class taskNode : public QObject
{
	Q_OBJECT
	public:
		//进程缓存
		QByteArray m_array_stdout;
};
void taskNode::slot_readyReadStandardOutput()
{
	QByteArray arred = m_process->readAllStandardOutput();
	m_array_stdout.append(arred);
	while (m_array_stdout.size()>=sizeof(TASKBUS::subject_package_header))
	{
		auto * header =
				reinterpret_cast<const TASKBUS::subject_package_header *>
				(m_array_stdout.constData());
		//合法性 Legitimacy
		if (header->prefix[0]!=0x3C || header->prefix[1]!=0x5A ||
				header->prefix[2]!=0x7E || header->prefix[3]!=0x69)
			m_array_stdout.clear();
		else
		{
			if (m_array_stdout.size()>= sizeof(TASKBUS::subject_package_header)+header->data_length)
			{
				//不必要的内存拷贝
				QByteArray arr(m_array_stdout.constData(),
							   sizeof(TASKBUS::subject_package_header)
							   +header->data_length);
				//频繁的内存搬移(删除头部包)
				m_array_stdout.remove(0,sizeof(TASKBUS::subject_package_header)
									  +header->data_length);
				emit_package(arr);
			}
			if (m_array_stdout.size()>=sizeof(TASKBUS::subject_package_header))
			{
				header = reinterpret_cast<const TASKBUS::subject_package_header *>(m_array_stdout.constData());
				if (!(m_array_stdout.size()>=sizeof(TASKBUS::subject_package_header)+header->data_length))
					break;
			}
		}
	}
}

/*!

这种策略下,m_array_stdout 不断发生内存分配、缩短、搬移,CPU占用率非常高。

代码参考 https://gitcode.net/coloreaglestdio/taskbus

4.2 范例1: taskBus 准静态自增缓存管道吞吐

当我们遇到流式处理带宽变大,必须优化taskBus吞吐管道的时候,为了不显著改变程序结构,采用的是自增长的准静态内存。包来到的时候,被尽快处理。

class taskNode : public QObject
{
	Q_OBJECT
	public:
		//进程缓存
		QByteArray m_array_stdout;
		qsizetype m_readBufMax = 0;

};

void taskNode::slot_readyReadStandardOutput()
{
	int total_sz = m_process->size();
	while (total_sz)
	{
		//确保缓存充足
		const qsizetype tailsize = m_array_stdout.size()-m_readBufMax;
		if (tailsize < total_sz)
			m_array_stdout.resize(m_readBufMax + total_sz);
		total_sz = m_process->read(m_array_stdout.data()+m_readBufMax,total_sz);
		m_readBufMax += total_sz;
		//开始包处理,使用一个游标 startByte 
		qsizetype startByte = 0;
		char * pBufStart = m_array_stdout.data();
		while (sizeof(TASKBUS::subject_package_header) + startByte < m_readBufMax)
		{
			auto * header =
				reinterpret_cast<const TASKBUS::subject_package_header *>
				(pBufStart + startByte);
			//检测头部,如果没有检测到,继续下一个游标位置
			if (header->prefix[0]!=0x3C || header->prefix[1]!=0x5A ||
				header->prefix[2]!=0x7E || header->prefix[3]!=0x69)
				++startByte;
			else
			{
				if (m_readBufMax >=
					startByte + sizeof(TASKBUS::subject_package_header)+header->data_length)
				{
					char * pack_header =  pBufStart + startByte;
					qsizetype pack_size = sizeof(TASKBUS::subject_package_header)+header->data_length;
					emit_package(pack_header,pack_size);
					//推进游标
					startByte += pack_size;
				}
				else //本次数据不够,退出等待更多尾部数据
					break;
			}
		}
		//把尾部的剩余字节搬移到首部(只在黏包时会发生)
		const int taiBytes = m_readBufMax - startByte;
		if (taiBytes)
			memmove(pBufStart,pBufStart+startByte,taiBytes);
		m_readBufMax = taiBytes;
		total_sz = m_process->size();
	}
}

代码参考 https://gitcode.net/coloreaglestdio/taskbus

4.3:范例2 万兆以太网环状逐包缓存

范例2,采用了环状单包缓存器与求余游标,共同组成了高速处理器结构。由于PCAP抓包的包是有分界的,所以直接采用VECTOR构造逐包存储的环状缓存。

	//全局环状缓存
	#define PCAPIO_BUFCNT 65536
	#define PCAPIO_MAXPACK 10000
	extern QVector<tag_packages> global_buffer;
	extern QAtomicInteger<quint64> pcap_recv_pos;
	
	void init_buffer()
	{
		global_buffer.clear();
		for (int i = 0;i < PCAPIO_BUFCNT;++i)
		{
			tag_packages pack;
			pack.from_id = 0;
			pack.to_id = -1;
			pack.len = 0;
			pack.data.resize(PCAPIO_MAXPACK);
			global_buffer.push_back(pack);
		}
	}
	void Loop()
	{
			//IN LOOP
			const u_char *packet;
			struct pcap_pkthdr header;
				while (!pcap_stop)
			{
				packet = pcap_next(handle, &header);
				if(packet)
				{
					if (enqueuePack((const char *)packet,header.len,id,itstr))
					{
						if (++total_caps % 1000==0)
							msg(QObject::tr("%1 recv %2").arg(itstr).arg(total_caps));
					}
				}
			}
	}

	bool enqueuePack(const  char * packet, const int len, const int npid, const QString & portname )
	{
		if (len>PCAPIO_MAXPACK || len <14)
			return false;
		bool enqueued = false;
		using namespace  PCAPIO;
		if (Valid(packet))
		{
			quint64 pos = pcap_recv_pos++;
			global_buffer[pos % PCAPIO_BUFCNT].from_id = npid;
			global_buffer[pos % PCAPIO_BUFCNT].to_id = dst_id;
			global_buffer[pos % PCAPIO_BUFCNT].len = len;
			memcpy_s(
						global_buffer[pos % PCAPIO_BUFCNT].data.data(),PCAPIO_MAXPACK,
					packet,len
					);
			enqueued = true;
		}
		return enqueued;
	}

完整工程参考 https://gitcode.net/coloreaglestdio/pcaphub

4.4:范例3 USRP 实时环状流缓存

USRP SDR 的环状缓存,是一种无包的结构。采用环状缓存,可以只在收尾进行处理。

//...
class uhd_device
{
protected:
	uhd_io_thread * m_initThread;
	//初始化队列
	short	 		(*m_data_iq_rx)[2] = nullptr;
	size_t			m_rx_bufsz = 1024*1024*16;
};
uhd_device::uhd_device()
{
	m_data_iq_rx = new short [m_rx_bufsz+1024*1024][2]{{0,0}};
	m_data_iq_tx = new short [m_tx_bufsz+1024*1024][2]{{0,0}};
}
//Thread
{
		try {
			while (!stop_signal_called_rx)
			{
				const size_t off = m_rx_points % m_rx_bufsz;
				size_t real_size = 0;
				void * addr = (void *)&m_data_iq_rx[off][0];
				UHD_DO(uhd_rx_streamer_recv(rx_streamer,(void **) &addr, m_rxblock_size, &rx_meta, 10, true, &real_size));
				m_rx_points += real_size;
				const size_t new_begin = m_rx_points % m_rx_bufsz;
				//把尾部冒出去的部分搬移到首部。只在缓存回环一圈时发生,很稀疏
				if (new_begin < m_rxblock_size)
					memcpy(m_data_iq_rx, &m_data_iq_rx[m_rx_bufsz], sizeof(short)*2*new_begin);
			}

		}  catch (...) {
			stop_signal_called_rx = true;
		}
}

完整代码参考 https://gitcode.net/coloreaglestdio/uhd_spectrum

4.5:范例4 4倍采样率接收机鲁棒缓存

对于底层的波形流,不但会出错,接收机因为时钟差,最佳采样时刻还会不断偏移。我们可以同时维护4个缓存进行处理。对物理层波形的逐个处理,都是使用的静态缓存。

long long g_starts[4] = {0,0,0,0};
QVector<unsigned short> cache_bits[4];
static std::shared_ptr<unsigned char> deinterbits[4]  {
		std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21]),
		std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21]),
		std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21]),
		std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21])
	};
static std::shared_ptr<unsigned char> bufframe (new unsigned char [65536*8+7]);
static std::shared_ptr<int> bufdata  (new int [65536*8+7]);
static std::shared_ptr<int> bufcode (new int [65536*8+256+VIT_WINLEN]);

long long nextPackStart = 0;

std::vector<std::vector<unsigned char> > alg_decap(const std::vector<unsigned char> & packages,void * codec)
{
	std::vector<std::vector<unsigned char> > res;
	//0. Append to cache
	const  unsigned short * pSyms = (const  unsigned short *)packages.data();
	const  int sym_total = packages.size()/sizeof(unsigned short)/4;
	for (int i=0;i<sym_total;++i)
	{
		for (int j=0;j<4;++j)
		{
			cache_bits[j].push_back(pSyms[i*4+j]);
		}
	}
	//4路处理
	for (int baisIdx = 0; baisIdx < 4; ++baisIdx) {
		const int bais = (lastGoodBais + baisIdx) % 4;
		//1. Batch deal
		const int cacheSize = cache_bits[bais].size();
		const unsigned short *pCache = cache_bits[bais].data();
		int nStart = 0;
		if (nStart + g_starts[bais] < nextPackStart)
			nStart = (nextPackStart - g_starts[bais]);
		//header    find
		while (nStart < cacheSize)
		{
			//1.寻找头部
			if (nStart + 32*2 <cacheSize)
			{
				//...
				errb = headerXor(nStart,pCache );
				if (errb)
				{
					++nStart;
					continue;
				}
			}
			else
				break;
			//推进子偏移
			H += 64;
			// 2.提取长度
			if (nStart + H + (16+16) * 8 * 2 >= cacheSize)
				break;
			unsigned long long len = extract_len(nStart ,pCache )	, lencheck =extract_lencheck(nStart,pCache  );
			//推进子偏移		
			H += 16 * 8 * 2;
			//长度可能出错
			if (len >=65536 || 	lencheck != checksum(len) )
			{
				++nStart;
				continue;
			}

			//提取内容
			const int bitlen = len * 3 * 8 + 7 * 3;

			if (nStart + bitlen*2 +  H >= cacheSize)
				break;		
			//纠错
			unsigned char * deinterb = deinterbits[bais].get();
			int * code = bufcode.get();
			deinterleave(nStart ,pCache ,deinterb ,code );
			int * data = bufdata.get();
			if ( decode(code ,data ))
			{
				int poped  = datalen(data );
				//4. form data
				if (poped == len * 8) {
					//Output
					unsigned char * frame = bufframe.get();
					bits2bytes(data ,frame );
					//Checksum					
					if (checkcrc32(frame )) {
						std::vector<unsigned char> g;
						std::copy(frame,frame+len,std::back_inserter(g));
						res.push_back(std::move(g));
						nResBs.insert(g_starts[bais] + nStart);
						packok = true;
					}
				}
			}
			//推进子偏移
			H += bitlen*2;
			nStart += H;
			if (packok) {
				nextPackStart = g_starts[bais] + nStart;
				lastGoodBais = bais;
			}
		}//end while cache size >0
		//移动尾部
		if (nStart)
		{
			cache_bits[bais].remove(0,nStart);
			g_starts[bais] += nStart;
		}
	}//end for (int bais = 0; bais < 4;++bais)
	return res;
}

详细代码参考 https://gitcode.net/coloreaglestdio/taskbus_course

5 总结

在更为复杂的底层流处理中,由于存在多消费者跟随等问题,使得流处理的代码更为复杂。但只要遵循基本的几条建议,性能都差不了。现场工程师调试分析待测试代码时,一定要注意编码者的水平和能力,以立即发现瓶颈问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/731855.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

视频怎么做成二维码?一招轻松制作二维码

怎么把视频做成二维码&#xff1f;现在用二维码来做载体存储视频&#xff0c;这种方法能够有效的减少内存占用&#xff0c;可以将视频储存在云端&#xff0c;他人只需要扫码就能够查看视频。下面来教大家一招关于视频二维码制作&#xff08;音视频二维码制作-一键免费生成音视频…

Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

一、案例说明 现有一电商网站数据文件&#xff0c;名为buyer_favorite1&#xff0c;记录了用户对商品的收藏数据&#xff0c;数据以“\t”键分割&#xff0c;数据内容及数据格式如下&#xff1a; 二、前置准备工作 项目环境说明 Linux Ubuntu 16.04jdk-7u75-linux-x64scal…

(LFPAK56)BUK7Y7R0-40HX 40V、N 通道BUK9Y6R5-40HX表面贴装汽车用MOSFET器件

汽车用MOSFET将低压超级结技术与先进的封装设计相结合&#xff0c;以实现高性能和耐用性。Trench 9 MOSFET系列产品全部符合AEC-Q101标准&#xff0c;且超越了这一国际汽车级标准的要求&#xff0c;在包括温度循环 (TC)、耐高温栅极偏置 (HTGB)、耐高温反向偏置 (HTRB) 和断续工…

DBETR-1X/180G24K4M反馈型比例压力阀放大器

DBETR-1X/30G24K4M&#xff0c;DBETR-1X/315G24K4M&#xff0c;DBETR-1X/80G24K4M&#xff0c;DBETR-1X/180G24K4M&#xff0c;DBETR-1X/230G24K4M&#xff0c;DBETR-1X/350G24K4M比例溢流阀是一种遥控阀。其设计结构为座阀式直动溢流阀&#xff0c;搭配外置式比例放大器。 这…

挑选适合自己的英文原版书

很多人在阅读英文原版小说时感觉十分吃力&#xff0c;有很多生词或长难句。如何寻找适合自己英文阅读水平的书籍呢&#xff1f;下面推荐一种按蓝思值挑选英文原版书的方法。 首先根据自己的受教育程度&#xff0c;选择对应蓝思级别的英文书。如博士可以选择蓝思值为1300L的英文…

图神经网络:(图像分割)三维网格图像分割

文章说明&#xff1a; 1)参考资料&#xff1a;PYG的文档。文档超链。斯坦福大学的机器学习课程。课程超链。(要挂梯子)。博客原文。原文超链。(要挂梯子)。原文理论参考文献。提取码8848。 2)我在百度网盘上传这篇文章的jupyter notebook以及预训练模型。提取码8848. 3)博主水平…

qt信号与槽

信号与槽的概念&#xff1a; 1>信号&#xff1a;信号就是信号函数&#xff0c;可以是组件自身提供&#xff0c;也可以是用户自己定义&#xff0c;自定义时&#xff0c;需要类体的signals权限下进行定义&#xff0c;该函数是一个不完整的函数&#xff0c;只有声明&#xff0…

输入一个链表,输出该链表的倒数第 k 的结点

一、思路 假设 K 是 2&#xff0c;根据下面的图片可以看出&#xff0c;倒数第 K 个结点就是 45。 需要注意的前提是&#xff0c;K 不能是负数也不能是 0 并且也不能超过链表的结点个数&#xff0c;因为要保证 K 是在链表的范围里&#xff0c;才能找到 K&#xff0c;然后返回这…

【网络】TCP三次握手和四次挥手(感性理解)

目录 三次握手 文字描述三次握手过程 为什么是三次握手&#xff1f; 什么是SYN洪水&#xff1f; 连接和半连接队列 一次、两次握手行不行&#xff0c;四/五/六次握手行不行&#xff1f; 三次握手一定会成功吗&#xff1f; 三次握手的过程中可不可以携带数据 TCP中的IS…

模块化规范

常用模块化有两种规范&#xff0c;commonJS和ES6 一&#xff1a;两者区别 二&#xff1a;如何转义&#xff1f; 我们常遇到的使用场景是&#xff0c;在commonJS的模块里需要引入ES6规范的模块。这时就需要把ES6模块转译为commonJS规范的模块&#xff0c;否则报错 转义工具有…

javassist 02 implement interface

创建 interface package com.wsd;public interface AccountDao {int delete(); }利用 javassist 生产一个 类A, Class A implements AccountDao package com.wsd;import javassist.ClassPool; import javassist.CtClass; import javassist.CtMethod; import javassist.Modifi…

mac桌面时钟 浮动 (python)

浮动时钟&#xff0c;多地时区 app store的都要钱&#xff0c;于是。。。。我们让chatgpt来实现一个吧&#xff1a; 数字&#xff1a; 代码&#xff1a; import sys import datetime import pytzfrom PyQt5.QtWidgets import QApplication, QMainWindow, QGraphicsView, QGr…

深度学习不同数据增广方法的选用分析

一般情况下&#xff0c;可以将数据扩增方法分为单数据变形、多数据混合、学习数据分布规律生成新数据和学习增广策略等4 类方法。以上顺序也在一定程度上反映了数据增广方法的发展历程。如果与Shorten和Khoshgoftaar的成果对照&#xff0c;就图像数据而言&#xff0c;基于数据变…

抖音矩阵源码搭建开发技术部署分析

目录 一、 什么是抖音矩阵&#xff1f;源码搭建开发注意事项&#xff1f; 1. 抖音矩阵概述 2. 源码搭建开发注意事项&#xff1a; 二、 使用步骤及开发代码展示 一、 什么是抖音矩阵&#xff1f;源码搭建开发注意事项&#xff1f; 1. 抖音矩阵概述 首先&#xff0c;抖音账…

21夜间车牌识别(matlab程序)

1.简述 简单说一下实现思路&#xff1a; 读取图片&#xff0c;转灰度&#xff0c;计算灰度直方图&#xff0c;估算阈值&#xff08;这里的阈值计算很重要&#xff0c;经过阈值算法&#xff0c;选取一个最恰当的阈值&#xff09;&#xff0c;之后二值化。显示图像即可。 实现目…

爬虫爬取公众号文章

前言 自从chatGPT出现后&#xff0c;对于文本处理的能力直接上升了一个维度。在这之前&#xff0c;我们爬取到网络上的文本内容之后&#xff0c;都需要写一个文本清理的程序&#xff0c;对文本进行清洗&#xff0c;而现在&#xff0c;有了chatGPT的加持&#xff0c;我们只需要…

解决程序占用较多内存的问题

今天发现自己开发的一个程序占用了大量内存而且不会自动释放 &#xff0c;我的程序在windows中运行的&#xff0c;解决办法如下&#xff1a; 第一步&#xff1a;打开任务管理器&#xff0c;打到正在运行程序 &#xff08;这里以sql server为例&#xff09;&#xff0c;然后右击…

设计合并排序算法实现对N个整数排序。

1.题目 设计合并排序算法实现对N个整数排序 2.设计思路 先将无序序列利用分治法划分为子序列,直至每个子序列只有一个元素,然后再对有序子序列逐步进行合并排序。合并方法是循环的将两个有序子序列当前的首元素进行比较,较小的元素取出,置入合并序列的左边空置位,直至其中…

特征选择算法 | Matlab 基于最大相关最小冗余特征选择算法(mRMR)的分类数据特征选择

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 特征选择算法 | Matlab 基于最大相关最小冗余特征选择算法(mRMR)的分类数据特征选择 部分源码 %--------------------

Redis实战案例12-添加秒杀券实现秒杀下单及相关问题解决

1. 添加优惠券 该项目没有后台管理的界面&#xff0c;所以采用postman发送请求 http://localhost:8081/voucher/seckill注意end时间要大于当前系统时间 {"shopId": 2,"title": "100元代金券","subTitle": "周一至周五均可使用&qu…