流式处理区别于按包处理,指的是对处理者而言,面对的是逻辑上无头无尾的数据流。因此,在提取数据流中的包时,就需要遵循其内在的格式,进行头部捕获、提取、校验。然而,如果不考虑TCP等流式数据的异常情况,而简单的编程,会在性能、稳定性上踩坑。本人作为资深现场工程师,协助开发团队处理过各种千奇百怪的流式处理问题,本文把其中较为常见的一并总结一下,希望对避坑有所帮助。
1. 流式传输的常见异常
1.1 普通断线
受到物理层以及各级底层协议栈的影响,流式处理可能发生中断。复现:拔除网线、串口等电缆,或者关闭电源。此时,主对方的吞吐都会中断,只能再次发起连接。
相对于下面几个异常,普通的断线是最容易检测处理的。只需要重新发起连接即可。
1.2 黏包断线
一般的流式处理,都会有策略区别两次完整、独立的流。比如两次TCP连接,他们的五元组是不同的,源端口一般会发生变化。此外,各类流式协议内层的ID等参数在重连后也会变化,所以即使断线,再次连接后,也是崭新的流。以前的流中的片段不会混进来。然而,流传输的环境是错综复杂的,比如存在一些特殊的代理以及中继。考虑下面的情况:
代理方在端1发起连接时,会主动连接端2,并协助周转主数据。
如果代理方在检测到端1断开后,依旧保持与端2的连接,则设计不佳的代理、端2可能不会区分前后两个连接的流。
这种情况,导致端2收到的数据异常:
- 截断。端1中断前的流并没有传输完毕,停止在一个中间状态。
- 错位。端2在上层协议的包长度、包头解析时,发生错位。
- 校验失败。端2的上层协议把上一次端1的末包头和本次会话的新数据混合,导致看似正确的包结构,但校验失败。
- 系统崩溃。如果上层协议不检测完整性,而是默认数据是正确的,则可能发生崩溃。
1.3 错包混入
曾有极少实现欠佳的设备个例,出现错包。这种错包理论上是不可能发生的,因为流协议的校验会保证单次会话的完整性。但是,由于设备本身对协议栈实现的缺陷,导致错包。原因是采用软件设备来实现NAT等功能,直接在底层开展包处理,对地址进行替换。当底层协议并不保证数据的正确性时,替换者又不去检查上层协议的校验,转而直接进行校验生成,导致原本校验不通过的数据竟然校验正确了!
基于这种处理的单片机软件设备曾经造成了重大的安全事故。作为设计通用流式处理工具的设计者,有必要考虑到这种极端边界条件的存在。
2. 确保稳定性的原则
确保稳定性的方法就是采用零信任的拆包组包策略。
- 流式处理的上层单位需要具备可识别的头部。
- 需要有额外校验方式
- 无论何时都按照最坏的情况来检测数据的一致性。
- 假设下一个包永远不在正确的位置上。
- 假设长度字段会错误,成为负数或者超大正整数。
- 假设承载的内容会发生错误。
- 假设断线会发生在任何一个字节甚至是比特上。
如果按照上述假设编程,基本不会存在稳定性问题。
3.确保性能的原则
既然是流式处理,必然存在缓存的吞吐。在无限长的流中,有限的内存只能以窗口的形式处理数据。
一般来说,窗口大小应该大于头部的大小。用何种数据结构盛放窗口,何种策略滑动窗口,对性能的影响非常大。
确保性能的原则如下:
- 尽可能降低动态分配内存、重新调整内存大小的频率。
- 使用游标而不是实际的0下标来标定头部。这样可以避免频繁的内存拷贝、搬移。
- 较大的缓存窗口有利于提高性能。
4.举例
4.1反面范例-taskBus 旧版管道吞吐
我们首先看一个反面教材。这个教材是由初学者搭建的,一直没有优化过。因为流速很慢,所以也能正常工作。
class taskNode : public QObject
{
Q_OBJECT
public:
//进程缓存
QByteArray m_array_stdout;
};
void taskNode::slot_readyReadStandardOutput()
{
QByteArray arred = m_process->readAllStandardOutput();
m_array_stdout.append(arred);
while (m_array_stdout.size()>=sizeof(TASKBUS::subject_package_header))
{
auto * header =
reinterpret_cast<const TASKBUS::subject_package_header *>
(m_array_stdout.constData());
//合法性 Legitimacy
if (header->prefix[0]!=0x3C || header->prefix[1]!=0x5A ||
header->prefix[2]!=0x7E || header->prefix[3]!=0x69)
m_array_stdout.clear();
else
{
if (m_array_stdout.size()>= sizeof(TASKBUS::subject_package_header)+header->data_length)
{
//不必要的内存拷贝
QByteArray arr(m_array_stdout.constData(),
sizeof(TASKBUS::subject_package_header)
+header->data_length);
//频繁的内存搬移(删除头部包)
m_array_stdout.remove(0,sizeof(TASKBUS::subject_package_header)
+header->data_length);
emit_package(arr);
}
if (m_array_stdout.size()>=sizeof(TASKBUS::subject_package_header))
{
header = reinterpret_cast<const TASKBUS::subject_package_header *>(m_array_stdout.constData());
if (!(m_array_stdout.size()>=sizeof(TASKBUS::subject_package_header)+header->data_length))
break;
}
}
}
}
/*!
这种策略下,m_array_stdout 不断发生内存分配、缩短、搬移,CPU占用率非常高。
代码参考 https://gitcode.net/coloreaglestdio/taskbus
4.2 范例1: taskBus 准静态自增缓存管道吞吐
当我们遇到流式处理带宽变大,必须优化taskBus吞吐管道的时候,为了不显著改变程序结构,采用的是自增长的准静态内存。包来到的时候,被尽快处理。
class taskNode : public QObject
{
Q_OBJECT
public:
//进程缓存
QByteArray m_array_stdout;
qsizetype m_readBufMax = 0;
};
void taskNode::slot_readyReadStandardOutput()
{
int total_sz = m_process->size();
while (total_sz)
{
//确保缓存充足
const qsizetype tailsize = m_array_stdout.size()-m_readBufMax;
if (tailsize < total_sz)
m_array_stdout.resize(m_readBufMax + total_sz);
total_sz = m_process->read(m_array_stdout.data()+m_readBufMax,total_sz);
m_readBufMax += total_sz;
//开始包处理,使用一个游标 startByte
qsizetype startByte = 0;
char * pBufStart = m_array_stdout.data();
while (sizeof(TASKBUS::subject_package_header) + startByte < m_readBufMax)
{
auto * header =
reinterpret_cast<const TASKBUS::subject_package_header *>
(pBufStart + startByte);
//检测头部,如果没有检测到,继续下一个游标位置
if (header->prefix[0]!=0x3C || header->prefix[1]!=0x5A ||
header->prefix[2]!=0x7E || header->prefix[3]!=0x69)
++startByte;
else
{
if (m_readBufMax >=
startByte + sizeof(TASKBUS::subject_package_header)+header->data_length)
{
char * pack_header = pBufStart + startByte;
qsizetype pack_size = sizeof(TASKBUS::subject_package_header)+header->data_length;
emit_package(pack_header,pack_size);
//推进游标
startByte += pack_size;
}
else //本次数据不够,退出等待更多尾部数据
break;
}
}
//把尾部的剩余字节搬移到首部(只在黏包时会发生)
const int taiBytes = m_readBufMax - startByte;
if (taiBytes)
memmove(pBufStart,pBufStart+startByte,taiBytes);
m_readBufMax = taiBytes;
total_sz = m_process->size();
}
}
代码参考 https://gitcode.net/coloreaglestdio/taskbus
4.3:范例2 万兆以太网环状逐包缓存
范例2,采用了环状单包缓存器与求余游标,共同组成了高速处理器结构。由于PCAP抓包的包是有分界的,所以直接采用VECTOR构造逐包存储的环状缓存。
//全局环状缓存
#define PCAPIO_BUFCNT 65536
#define PCAPIO_MAXPACK 10000
extern QVector<tag_packages> global_buffer;
extern QAtomicInteger<quint64> pcap_recv_pos;
void init_buffer()
{
global_buffer.clear();
for (int i = 0;i < PCAPIO_BUFCNT;++i)
{
tag_packages pack;
pack.from_id = 0;
pack.to_id = -1;
pack.len = 0;
pack.data.resize(PCAPIO_MAXPACK);
global_buffer.push_back(pack);
}
}
void Loop()
{
//IN LOOP
const u_char *packet;
struct pcap_pkthdr header;
while (!pcap_stop)
{
packet = pcap_next(handle, &header);
if(packet)
{
if (enqueuePack((const char *)packet,header.len,id,itstr))
{
if (++total_caps % 1000==0)
msg(QObject::tr("%1 recv %2").arg(itstr).arg(total_caps));
}
}
}
}
bool enqueuePack(const char * packet, const int len, const int npid, const QString & portname )
{
if (len>PCAPIO_MAXPACK || len <14)
return false;
bool enqueued = false;
using namespace PCAPIO;
if (Valid(packet))
{
quint64 pos = pcap_recv_pos++;
global_buffer[pos % PCAPIO_BUFCNT].from_id = npid;
global_buffer[pos % PCAPIO_BUFCNT].to_id = dst_id;
global_buffer[pos % PCAPIO_BUFCNT].len = len;
memcpy_s(
global_buffer[pos % PCAPIO_BUFCNT].data.data(),PCAPIO_MAXPACK,
packet,len
);
enqueued = true;
}
return enqueued;
}
完整工程参考 https://gitcode.net/coloreaglestdio/pcaphub
4.4:范例3 USRP 实时环状流缓存
USRP SDR 的环状缓存,是一种无包的结构。采用环状缓存,可以只在收尾进行处理。
//...
class uhd_device
{
protected:
uhd_io_thread * m_initThread;
//初始化队列
short (*m_data_iq_rx)[2] = nullptr;
size_t m_rx_bufsz = 1024*1024*16;
};
uhd_device::uhd_device()
{
m_data_iq_rx = new short [m_rx_bufsz+1024*1024][2]{{0,0}};
m_data_iq_tx = new short [m_tx_bufsz+1024*1024][2]{{0,0}};
}
//Thread
{
try {
while (!stop_signal_called_rx)
{
const size_t off = m_rx_points % m_rx_bufsz;
size_t real_size = 0;
void * addr = (void *)&m_data_iq_rx[off][0];
UHD_DO(uhd_rx_streamer_recv(rx_streamer,(void **) &addr, m_rxblock_size, &rx_meta, 10, true, &real_size));
m_rx_points += real_size;
const size_t new_begin = m_rx_points % m_rx_bufsz;
//把尾部冒出去的部分搬移到首部。只在缓存回环一圈时发生,很稀疏
if (new_begin < m_rxblock_size)
memcpy(m_data_iq_rx, &m_data_iq_rx[m_rx_bufsz], sizeof(short)*2*new_begin);
}
} catch (...) {
stop_signal_called_rx = true;
}
}
完整代码参考 https://gitcode.net/coloreaglestdio/uhd_spectrum
4.5:范例4 4倍采样率接收机鲁棒缓存
对于底层的波形流,不但会出错,接收机因为时钟差,最佳采样时刻还会不断偏移。我们可以同时维护4个缓存进行处理。对物理层波形的逐个处理,都是使用的静态缓存。
long long g_starts[4] = {0,0,0,0};
QVector<unsigned short> cache_bits[4];
static std::shared_ptr<unsigned char> deinterbits[4] {
std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21]),
std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21]),
std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21]),
std::shared_ptr<unsigned char>(new unsigned char [65536*3*8+21])
};
static std::shared_ptr<unsigned char> bufframe (new unsigned char [65536*8+7]);
static std::shared_ptr<int> bufdata (new int [65536*8+7]);
static std::shared_ptr<int> bufcode (new int [65536*8+256+VIT_WINLEN]);
long long nextPackStart = 0;
std::vector<std::vector<unsigned char> > alg_decap(const std::vector<unsigned char> & packages,void * codec)
{
std::vector<std::vector<unsigned char> > res;
//0. Append to cache
const unsigned short * pSyms = (const unsigned short *)packages.data();
const int sym_total = packages.size()/sizeof(unsigned short)/4;
for (int i=0;i<sym_total;++i)
{
for (int j=0;j<4;++j)
{
cache_bits[j].push_back(pSyms[i*4+j]);
}
}
//4路处理
for (int baisIdx = 0; baisIdx < 4; ++baisIdx) {
const int bais = (lastGoodBais + baisIdx) % 4;
//1. Batch deal
const int cacheSize = cache_bits[bais].size();
const unsigned short *pCache = cache_bits[bais].data();
int nStart = 0;
if (nStart + g_starts[bais] < nextPackStart)
nStart = (nextPackStart - g_starts[bais]);
//header find
while (nStart < cacheSize)
{
//1.寻找头部
if (nStart + 32*2 <cacheSize)
{
//...
errb = headerXor(nStart,pCache );
if (errb)
{
++nStart;
continue;
}
}
else
break;
//推进子偏移
H += 64;
// 2.提取长度
if (nStart + H + (16+16) * 8 * 2 >= cacheSize)
break;
unsigned long long len = extract_len(nStart ,pCache ) , lencheck =extract_lencheck(nStart,pCache );
//推进子偏移
H += 16 * 8 * 2;
//长度可能出错
if (len >=65536 || lencheck != checksum(len) )
{
++nStart;
continue;
}
//提取内容
const int bitlen = len * 3 * 8 + 7 * 3;
if (nStart + bitlen*2 + H >= cacheSize)
break;
//纠错
unsigned char * deinterb = deinterbits[bais].get();
int * code = bufcode.get();
deinterleave(nStart ,pCache ,deinterb ,code );
int * data = bufdata.get();
if ( decode(code ,data ))
{
int poped = datalen(data );
//4. form data
if (poped == len * 8) {
//Output
unsigned char * frame = bufframe.get();
bits2bytes(data ,frame );
//Checksum
if (checkcrc32(frame )) {
std::vector<unsigned char> g;
std::copy(frame,frame+len,std::back_inserter(g));
res.push_back(std::move(g));
nResBs.insert(g_starts[bais] + nStart);
packok = true;
}
}
}
//推进子偏移
H += bitlen*2;
nStart += H;
if (packok) {
nextPackStart = g_starts[bais] + nStart;
lastGoodBais = bais;
}
}//end while cache size >0
//移动尾部
if (nStart)
{
cache_bits[bais].remove(0,nStart);
g_starts[bais] += nStart;
}
}//end for (int bais = 0; bais < 4;++bais)
return res;
}
详细代码参考 https://gitcode.net/coloreaglestdio/taskbus_course
5 总结
在更为复杂的底层流处理中,由于存在多消费者跟随等问题,使得流处理的代码更为复杂。但只要遵循基本的几条建议,性能都差不了。现场工程师调试分析待测试代码时,一定要注意编码者的水平和能力,以立即发现瓶颈问题。