在前文中,我们介绍了EPDR技术的起源,以及使用该技术驱动的业余软件无线电平台专栏。已有玩家通过踩坑证明,进程管道交换数据时间延迟大(10ms),构造时间敏感系统难。除非采用传统的紧耦合设计及更大的颗粒度,否则很难在期望的时刻执行正确的动作。典型的系统包括:
- 带有 TDMA 收发同步的通信系统
- 超过20MHz采样率的CPU算法程序
- 驱动X310等超宽带采样
为了不给读者造成困扰和不必要的尝试,通过此文安排一个数据吞吐的Banchmark,明确这个技术以及相应平台的局限,并给出可能满足上述需求的解决构想。经过测试,最佳成绩在 i7-10700U 笔记本上达到,整体吞吐3GBps,单路1.2GBps,平均延迟2ms。
1 EPDR设计理念
EPDR based taskBus的设计初衷是为了用跨进程的方式黏合不同开发工具开发的程序,以解决一些灵活性和学习曲线不可兼得的问题。在开发之初,它甚至不是一个用于业余SDR的平台,而是用于矿脉勘探传感器数据处理等弱实时性的领域。
设计理念参考这个文档。需要强调的是,尽管taskBus看起来类似一些图标拖拽的产品,实际上它既不是GNU-Radio、Pothos Flow,也不是Simulink、Labview——taskBus的理想颗粒度更大。举例子,对于一个仿真通信系统,在模块里实现到什么颗粒度,是有讲究的。有的工具,可能一个滤波器、1个加法器就弄了1个模块,而taskBus希望颗粒度略大一些,这样便于优化模块内性能,并降低管道IO。
以simulink的QPSK理想范例为例,如果使用taskBus实现功能,典型的划分是下图的7个模块,而不是30个Blockset。
上图底图来自网络搜索。
- 模块1 完成调制仿真,包含simulink中的13个Blockset。
- 模块2 完成解调仿真, 包含simulink中的12个Blockset。
- 模块3\4\5\6完成显示、数据生成等工作,与Simulink粒度一致。
2 管道的实时性和峰值流量瓶颈
taskBus设计时,希望通过管道跨进程共享的工具主要包括文件、影音、GUI绘图、串口、键鼠、数据库等通用非实时模块。有了这些模块,强实时性的独立进程就只用负责算法,无需了解更多的知识。在设计场景中,1个实验小组由3-4个人组成,独立的用自己喜欢的程序构造紧耦合模块,所有的实时性都在模块内解决。换句话说:taskBus 不支持强实时算法的复用,TDMA的发射、接收同步需要紧耦合写在一起。
对弱实时性的应用,比如听广播、对讲机,典型的颗粒度类似基于taskBus的通信课程里的范例。在这些范例里,对一个教学QPSK的波形处理、信道处理是两个不同的模块,允许不同波形和纠错之间进行组合。即使是弱实时应用,一个模块内还是包含很多步骤。比如DDC、下抽、插值等等。这些步骤是用C语言紧耦合在一起的,并没有分开。
对强实时性的应用,比如TDMA在毫秒级别对准时间,使用管道是不切实际的。一个事件循环以及跨进程的流转缓存所经历的延迟,基本在10毫秒级别,且受到操作系统当前状态的影响,并不稳定。经过现场测试,在后台弹出某杀毒软件升级信息时,整体延迟高达500ms以上。这是因为虽然模块可以采用fflush(stdout)确保进程出口的延迟小,但管道数据进入操作系统后,就不受控制。如果采用Linux的实时内核,情况会有改观,但在大吞吐下,依旧存在问题。
同时,OS的管道总流量是有限的。一般来说串接的模块越多,流量被分流的越厉害。这个和TCP点对点连接类似,eCal等使用UDP组播的通信中间件则没有这个问题。
2.1 测试程序
测试模块只进行数据吞吐,通过时间标记,来计算峰值吞吐流量下的时延。
#include <cstdio>
#include <string>
#include <cstring>
#include <cstdlib>
#include <thread>
#include <atomic>
#include <ctime>
#include <mutex>
#ifdef WIN32
#include <io.h>
#include <fcntl.h>
#endif
int instance = 0;
int sub_input = 0, sub_output = 0;
std::atomic<int> way_count = 0;
using namespace std;
const int n_len = 65536-16;
std::mutex mtx;
int main(int argc, char * argv[])
{
//In windows, stdio must be set to BINARY mode, to
//prevent linebreak \\n\\r replace.
#ifdef WIN32
setmode(fileno(stdout),O_BINARY);
setmode(fileno(stdin),O_BINARY);
#endif
bool bInfo = false, finished = false;
string function;
//1. parse cmdline
for (int i=1;i<argc;++i)
{
string arg_key = argv[i], arg_value = argv[i];
int idx = arg_key.find('=');
if (idx>=0 && idx<arg_key.size())
{
arg_key = arg_key.substr(0,idx);
arg_value = arg_value.substr(idx+1);
}
if (arg_key=="--function")
function = arg_value;
else if (arg_key=="--information")
bInfo = true;
else if (arg_key=="--instance")
instance = atoi(arg_value.c_str());
else if (arg_key=="--data_in")
sub_input = atoi(arg_value.c_str());
else if (arg_key=="--data_out")
sub_output = atoi(arg_value.c_str());
fprintf(stderr,"%s:%s\n",arg_key.c_str(),arg_value.c_str());
fflush(stderr);
}
//2. function case
if (bInfo)
{
//In this example, json file will be published with exe file.
//We will return directly. Or, you can output json here to stdout,
//If you do not want to publish your json file.
return 0;
}
else if (instance<=0 || function.length()==0)
return -1;
else
{
std::thread th_send([&]()->void {
if (sub_output > 0)
{
int thd = 10;
while (way_count >= 0)
{
if (way_count < thd )
{
static char buf_header[4] = { 0x3c,0x5a,0x7e,0x69 };
static char data[n_len]{ 0 };
static clock_t* clk = (clock_t*)&data[0];
static long long* cnt = (long long*)&data[8];
*clk = clock();
mtx.lock();
fwrite(buf_header, 1, 4, stdout);
fwrite(&sub_output, sizeof(int), 1, stdout);
fwrite(&instance, sizeof(int), 1, stdout);
fwrite(&n_len, sizeof(int), 1, stdout);
fwrite(data, sizeof(char), n_len, stdout);
++way_count;
++(*cnt);
fflush(stdout);
mtx.unlock();
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (way_count ==0 && thd <1000 )
++thd;
}
}
}
});
long long recvcnt = 0;
long long delay = 0;
clock_t first_clk = 0;
while(false==finished)
{
static char header[4], data[n_len]{ 0 };
static clock_t* clk = (clock_t*)&data[0];
static long long* cnt = (long long*)&data[8];
int n_sub = 0, n_path = 0, len = 0;
fread(header,1,4,stdin); //2.1 read header
if (header[0]!=0x3C || header[1]!=0x5A || header[2]!=0x7E || header[3]!=0x69)
{
fprintf(stderr,"BAD HEADER\n");
fflush(stderr);
continue;
}
fread(&n_sub,sizeof(int),1,stdin);
fread(&n_path,sizeof(int),1,stdin);
fread(&len,sizeof(int),1,stdin);
if (len < 0 || len != n_len || n_sub <= 0)
{
int rflen = len;
while (rflen > 0)
{
int rdlen = n_len;
if (rdlen > rflen)
rdlen = rflen;
fread(data, sizeof(char), rdlen, stdin);
rflen -= rdlen;
}
if (strstr(data, "function=quit;") != nullptr)
{
finished = true;
continue;
}
}
else
{
fread(data, sizeof(char), n_len, stdin);
if (n_sub != sub_input)
{
fprintf(stderr,"BAD SUBJECT\n");
fflush(stderr);
continue;
}
if (n_path != instance)
{
mtx.lock();
fwrite(header, 1, 4, stdout);
fwrite(&sub_output, sizeof(int), 1, stdout);
fwrite(&n_path, sizeof(int), 1, stdout);
fwrite(&n_len, sizeof(int), 1, stdout);
fwrite(data, sizeof(char), n_len, stdout);
fflush(stdout);
mtx.unlock();
}
else
{
--way_count;
if (recvcnt == 0)
first_clk = *clk;
++recvcnt;
delay += clock() - *clk;
if (recvcnt >= 10000)
{
recvcnt = 0;
delay /= 10000;
long long total_bytes = 65536 * 10000;
double tmCost = (*clk - first_clk) * 1.0 / CLOCKS_PER_SEC+1e-10;
double speed = total_bytes*1.0/1024/1024/tmCost;
fprintf(stderr, "Cnt = %d, Average delay = %d clocks, total Speed = %.2lf MB/s.\n", *cnt,(int)delay,speed);
fflush(stderr);
}
}
}
}
way_count = -2;
th_send.join();
}
//3.exit
return 0;
}
由于模块完全没有动态内存,且不做任何操作,所有的资源均用于数据传输。
2.2 测试结果
使用2个banchmark模块构成吞吐环,互相以最大速率向对方输出数据。收到对方数据后,即刻吐回,而后计算时差。
测试结果:
平台 | 系统 | 峰值吞吐 | 单路流量 | 平均来回延迟 |
---|---|---|---|---|
i7-10700U | Linux x64 | 3354MBps | 1340MBps | 1ms |
i7-6700K | Linux x64 | 2844MBps | 1050MBps | 2.2ms |
i7-6700K | win10 home x64 | 1345MBps | 340MBps | 40ms |
RaspberryPi 4(8GB) | Rasbain 64 | 223MBps | 102MBps | 6ms |
2.4 结论
- 同样的硬件, Linux下的管道吞吐性能远远优于Windows,且对带宽的利用率不同。应该是Linux下某一个内核行为比windows少了复制步骤。
- 最大峰值速率 1428.19MBps,在 i7 10代笔记本上测得。如果是i9,应该更好。这个峰值速率是指单路的速率,例子里是双路,整体速率达到 3.3GBps。
- 时延方面,Linux完胜windows。Linux即使是树莓派,时延也是10毫秒以内。windows为40ms。
3 替代方案
上述速率是在双路理想情况下测试的。路数越多,单路速率越小。同时,模块不可能不对数据进行缓存和处理,所以实际速率大概至少除以4. 时延依旧无法控制在毫秒以下,所以对TDMA的处理很可能会超时。要进行ms级别精确控制的用户,或者要求多路大吞吐的用户,则需要考虑下述替代方案:
- 采用传统的单进程大颗粒度设计,以协议栈的粒度实现重用。如1个进程完成整体的GSM协议栈,只是话筒、声卡、波形质量指示等信息通过管道输出。至于协议栈内的各级处理算法,还是传统的紧耦合设计。
- 采用基于UDP等即时协议的通信协议,或者中间件。典型的包括UDP套接字、The enhanced Communication Abstraction Layer (eCAL)等。但在通用OS下,还是会遇到丢包的问题。
- 采用FPGA而非通用CPU进行开发。FPGA的上位机(PC端)程序可以兼容taskBus,作为模块嵌入。代价是学习和培训成本高。
4. 非计算机团队黏合最佳策略
非计算机专业教研的黏合是一个非常具有挑战性的问题。这些困难包括:
- 团队中的核心不具备完整的计算机知识,可能只熟悉1个开发工具的一小部分,并无法独立完成调试。
- 团队成员掌握的知识链条差异很大,开发能力交集小,交流中无法Get到彼此的关键点。一些新进的硕士在学校内只学过C语言且只会写Hello world。一些博士几乎不具备基础的命令行能力。
- 团队成员价值主要靠论文、毕业答辩来体现,学习开发技术的动力不足,也不愿意涉足新的非相关知识领域。
- 团队的成员更替频繁,可能1个博士没干满3年就毕业了,硕士更短。
目前看来,对于这样的情况,代码集成+消息队列是最佳选择。这些年探索DLL、COM、Qt插件、EPDR(taskBus)、eCAL、Kafka等特定技术链条建立规范的努力并没有白费,这些尝试显著开拓了视野、增长了见识。未来taskBus将继续作为业余SDR小众平台开发下去,并逐步加入一些好玩的功能。
5 代码链接
参考:
https://gitcode.net/coloreaglestdio/taskbus