完整工程可从以下地址签出:
https://gitcode.net/coloreaglestdio/pcaphub.git
1.需求场景
在调试嵌入式物联设备时,尤其是在多个以太网物联设备交错通信的情况下,很难通过在捉襟见肘的嵌入式系统上进行数据记录与调试。如果设备连接的是一般的消费交换机,以及工业的架上交换机(一般位于车间的集控箱内),把调试笔记本插到空闲端口,是无法看到其他端口上设备的大部分数据的。这是因为交换机为了带宽和效率,会“记住”哪些MAC地址出现在哪些端口上,从而自动按照目的地址实现了点对点交换。也就是说,插在空闲口上的PC只能看到一些广播消息,大部分的UDP和TCP包是看不到的。
在古老的年代,有一种设备叫做集线器(Hub)。Hub是一种广播式的线路组合设备,可以达到这个要求。但由于所有端口共享一个带宽,导致通信效率很差,现在已经买不到了。当代通常在交换机上进行包测试,用的是镜像端口。如果恰好没有这种工业交换机,或者需要很多的镜像端口,又该如何调试呢?针对这种调试需求,可以使用PCAP,把一台插有多个以太网端口的PC变成Hub,观察所有接口上的数据流。
2.功能与原理
本软件主要功能如下:
- 支持以太网类型(10,100,1000M)接口。
- 灵活选取多个接口参与构建集线器。
- 可以为每个接口指定Filter条件。
- 双通道性能:>=100Mbps。
通过上述功能,就能够组成一个交换网络。
(1) 基本原理
本软件运行在调试工作站上,进程为“pcapHub”。调试的常见场景如下图所示:
当进行调试时,把待调试的工业设备从工业交换机断开,并临时接入在调试工作站的网卡上。同时,也可以接驳一些便携机(如果工作站没有显示器)。为了保持与原有工业网络的联通,还需要引一路线缆,把调试工作站和工业交换机连接起来。
经过上述连接,调试工作站和便携机上就能看到待调试工业设备的所有通信。调试工作站PC上的两个关键数据结构是实时队列和窗口知识。这两个结构保证了低延迟(1毫秒)、不重复(防止反复抓取冗余数据造成流量风暴)。
(2) 实时队列
本软件的实时队列是一个环形的预分配队列,一行一个pcap包,超过最大长度后绕回。队列每行一个tag_packages数据结构:
struct tag_packages{
int from_id;
int len;
QByteArray data;
};
QVector<tag_packages> global_buffer;
QAtomicInteger<quint64> pcap_recv_pos;
data 虽然为 QByteArray,但实际已经被预分配空间。因此,global_buffer 是一个静态的内存资源。主要的工作特点:
- 参与软集线器的网口,被分配了ID,以便维护和区分。from_id就是网口的ID,表明这个包的来源MAC设备连接在编号为from_id的网口上。
- 全局只有1个队列,存储着实时的网络数据。队列长度为 PCAPIO_BUFCNT(65536),每个队列中最大的数据长度为PCAPIO_MAXPACK(10000)字节。
- 全局只有1个入队(写)游标(pcap_recv_pos),所有的网口获取的数据,都会写入同一个队列。
- 各个网口在抓取时,会判断来源MAC是不是关联在本网口。关联在本网口上的来源MAC才会入队。
- 每个网口有1个读游标,不停地追赶写游标。在读取队列时,只有别的网口的来源MAC才会进行pcap_sendpack。
- 各个读、写都工作在独立的线程。
上述步骤,可以确保高效的进行数据流转。
(3)窗口知识
窗口知识记录着每个MAC地址连着哪个网口,是一个动态字典。其最外层数据结构是一个QMap,变量名称pcap_ports。Key是MAC地址(64位整形),Value是tag_portAssign结构。
struct tag_portAssign{
int curr_id;
quint64 mac;
QString portName;
QDateTime dtmLastAck;
};
static QMap<quint64,tag_portAssign> pcap_ports;
static QMutex mtx_ports;
这个结构有个显著的特点,就是标记了来源MAC最后一次活跃的时刻。如果此时刻很旧(超过5秒),那么来源MAC对应的设备就可能已经移除了。下一次这个来源MAC出现在别的网口时,该网口的入队逻辑就能够更新来源MAC的绑定关系到本网口。
具体说,在抓取包时,一个网卡抓获一个包,并得到一个来源MAC,遵循下面的伪代码规则来对待这个包,以避免重复抓取到刚刚写入的内容。
网口N 捕获新包,得到来源MAC
若:字典pcap_ports内查不到来源MAC
(说明来源MAC就连接在本网口上,它第一次出现)
在字典中添加新的MAC知识;
入队,更新pcap_recv_pos;
否则:
若:pcap_ports[来源MAC].curr_id==本网口ID
入队,更新pcap_recv_pos;
pcap_ports[来源MAC].dtmLastAck=NOW();
否则:
若 pcap_ports[来源MAC].dtmLastAck 比现在时间早5秒以上
(说明当前字典知识是老的,可能网线拔了,换了网口)
pcap_ports[来源MAC].curr_id=本网口ID;
pcap_ports[来源MAC].portName=本网口名字;
pcap_ports[来源MAC].dtmLastAck=NOW();
判断结束;
判断结束;
判断结束
在处理队列准备向本网卡推送包时,仅推送 tag_packages.from_id != 本网口ID 的包。
(4)并行读写
- 各个网口对应了自己的1个抓取线程、1个写入线程。
- 总的业务线程个数为参与网口x2
- 对字典的访问是有互斥的。
- 队列追赶使用的是atomic整形。
在启动交换时,分别为各个端口创建线程:
for (int i=0;i<ethers;++i)
{
cap_thread * recv = new cap_thread(this);
recv->setRunner(std::bind(
recv_loop,strName,id));
cap_thread * send = new cap_thread(this);
send->setRunner(std::bind(
send_loop,strName,id));
++id;
}
在线程内部,进行全局的队列读写:
//2. Run Cap Thread on interface.
void recv_loop(QString itstr, int id)
{
while (!pcap_stop)
{
pcap_t *handle = NULL;
char errbuf[PCAP_ERRBUF_SIZE];
handle = pcap_open_live(itstr, 65535, 1, 10, errbuf);
const u_char *packet;
struct pcap_pkthdr header;
while (!pcap_stop)
{
packet = pcap_next(handle, &header);
if(packet)
{
//Src MAC
quint64 mac_src = 0;
memcpy_s(&mac_src,8,packet+6,6);
bool MyPack = false;
QDateTime dtm = QDateTime::currentDateTime();
mtx_ports.lock();
const bool newClientMac = pcap_ports.contains(mac_src);
if (!newClientMac)
{
tag_portAssign & newmac = pcap_ports[mac_src];
newmac.mac = mac_src;
newmac.curr_id = id;
newmac.dtmLastAck = dtm;
MyPack = true;
}
else
{
tag_portAssign & curport = pcap_ports[mac_src];
if (curport.curr_id==id)
{
curport.dtmLastAck = dtm;
MyPack = true;
}
else if (pcap_ports[mac_src].dtmLastAck.msecsTo(dtm) >5000 )
{
curport.curr_id = id;
curport.dtmLastAck = dtm;
MyPack = true;
}
}
mtx_ports.unlock();
//Only Enqueue packs for etheraddrs connected to this port id.
if (MyPack)
{
quint64 pos = pcap_recv_pos++;
global_buffer[pos % PCAPIO_BUFCNT].from_id = id;
global_buffer[pos % PCAPIO_BUFCNT].len = header.len;
memcpy_s(
global_buffer[pos % PCAPIO_BUFCNT].data.data(),PCAPIO_MAXPACK,
packet,header.len
);
}
}
}
pcap_close(handle);
}
}
//3. Run Send Thread on interface
void send_loop(QString itstr,int id)
{
quint64 send_pos = pcap_recv_pos;
pcap_t *handle = NULL;
char errbuf[PCAP_ERRBUF_SIZE];
handle = pcap_open_live(strDev.c_str(), 65535, 1, 10, errbuf);
while (!pcap_stop)
{
if (send_pos > pcap_recv_pos)
send_pos = pcap_recv_pos;
if (send_pos == pcap_recv_pos)
{
QThread::usleep(500);
continue;
}
int pos = send_pos % PCAPIO_BUFCNT;
++send_pos;
if (global_buffer[pos].from_id!=id)
{
/* Send down the packet */
pcap_sendpacket(
handle, // Adapter
(const unsigned char *)global_buffer[pos].data.constData(),
global_buffer[pos].len // size
);
}
}
pcap_close(handle);
}
注意,上述代码经过大量简化,完整版本直接参考Git仓库。