本机环境为 Ubuntu20.04 ,dpdk-stable-20.11.10
使用scapy和wireshark发包抓包分析结果
完整代码见:github
Pipeline模型
DPDK Pipeline模型是基于Data Plane Development Kit(DPDK)的高性能数据包处理框架。它通过将数据流分为多个处理阶段,支持高效的数据包转发和处理。其架构包括多个模块,如数据接收、处理、转发和发送,每个模块可以独立优化,以提高性能和灵活性。
对DPDK-DNS收发包阶段进行时间上的阶段划分,使得每个阶段的处理时间尽可能均衡。每个处理阶段绑定一个逻辑核(lcore)进行处理。
DNS 协议
DNS资源记录
在DNS(域名系统)协议中,DNS资源记录(DNS Resource Record,RR)是DNS系统中存储数据的基本单位,每条记录存储与域名相关的信息。这些记录主要由主机名、IP地址、服务信息等构成,能够帮助解析器把域名转换为IP地址或其他信息。
每个DNS资源记录包含以下字段:
- 名称(Name):表示与该记录关联的域名,通常是需要查询的域名。
- 类型(Type):指定该记录的类型,表示该记录提供的信息类别,例如A记录、MX记录等。
- 类(Class):指定该记录所属的协议族。常见的是IN,代表互联网。
- 生存时间(TTL, Time to Live):记录在DNS服务器中的缓存时间,以秒为单位。TTL决定了缓存中这条记录在多长时间后失效。
- 数据长度(RDLENGTH):指记录中数据的长度,以字节为单位。
- 资源数据(RDATA):存储实际的数据,内容取决于记录的类型,比如A记录中的IP地址、MX记录中的邮件服务器信息等。
数据结构定义如下:
struct ResourceRecord {
char *name; // 资源记录的名称(域名)。
uint16_t type; // 资源记录的类型(例如 A、NS、MX、AAAA 等)。
uint16_t rr_class; // 资源记录的类,通常为 IN 类(1 表示互联网类)。
uint16_t ttl; // TTL(生存时间),表示该记录在缓存中保留的时间(以秒为单位)。
uint16_t rd_length; // 资源数据的长度,以字节为单位。
union ResourceData rd_data; // 资源记录的数据部分,使用上面定义的 ResourceData 联合体表示。
struct ResourceRecord* next; // 指向下一个资源记录的指针,用于将多个资源记录组织成链表。
};
整体及Header部分
DNS请求与响应的格式是一致的,其整体分为Header、Question、Answer、Authority、Additional5部分,如下图所示:
Header部分是一定有的,长度固定为12个字节;其余4部分可能有也可能没有,并且长度也不一定,这个在Header部分中有指明。Header的结构如下:
其中Header部分处理的数据结构如下:
struct Message {
uint16_t id; // 标识符,用于匹配请求和响应。请求和响应的 ID 应相同。
/* 标志位 */
uint16_t qr; // 查询/响应标志,0 表示查询,1 表示响应。
uint16_t opcode; // 操作码,表示查询类型,0 表示标准查询,1 表示反向查询。
uint16_t aa; // 授权回答标志,1 表示响应是来自授权域名服务器。
uint16_t tc; // 截断标志,1 表示响应被截断(超出 UDP 数据包大小)。
uint16_t rd; // 期望递归标志,1 表示客户端希望服务器执行递归查询。
uint16_t ra; // 可用递归标志,1 表示服务器支持递归查询。
uint16_t rcode; // 响应码,表示查询的状态,如 0 表示无错误,3 表示域名不存在。
uint16_t qdCount; // 问题记录数,表示查询中的问题数量。
uint16_t anCount; // 回答记录数,表示响应中的回答记录数量。
uint16_t nsCount; // 授权记录数,表示授权记录数量。
uint16_t arCount; // 附加记录数,表示附加记录数量。
struct Question* questions; // 指向问题记录的指针,可能有多个问题记录(链表)。
struct ResourceRecord* answers; // 指向回答记录的指针,可能有多个回答记录(链表)。
struct ResourceRecord* authorities; // 指向授权记录的指针,可能有多个授权记录(链表)。
struct ResourceRecord* additionals; // 指向附加记录的指针,可能有多个附加记录(链表)。
};
Question部分
Question部分的每一个实体的格式如下图所示:
Q主机名被"."号分割成了多段标签。在QNAME中,每段标签前面加一个数字,表示接下来标签的长度。比如:api.sina.com.cn表示成QNAME时,会在"api"前面加上一个字节0x03,"sina"前面加上一个字节0x04,"com"前面加上一个字节0x03,而"cn"前面加上一个字节0x02;
struct Question {
char *qName; // 问题的域名,例如 "www.example.com"。以字符指针形式存储,DNS 协议要求该名字使用特殊格式。
uint16_t qType; // 问题的类型。例如,1 代表 A 记录(IPv4 地址),28 代表 AAAA 记录(IPv6 地址)。
uint16_t qClass; // 问题的类。通常为 1,表示互联网类(IN)。
struct Question* next; // 指向下一个问题的指针,用于将多个问题组织成链表。因为 DNS 查询可以包含多个问题。
};
部分代码实现
代码较多,本文省略DNS具体实现步骤,着重看pipeline模型的实现。
主函数
初始化网卡、内存池、环形队列以及启动其他工作核心,该核心同时处理转发数据包的任务。在启动时为其他核心分配任务,设置接收(RX)和发送(TX)核心。
int main(int argc, char *argv[])
{
uint8_t ip[4] = {192, 168, 1, 1};
add_A_record("foo.bar.com",ip);
unsigned lcore_id;
uint16_t portid = 0, nb_ports = 1;
int ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
argc -= ret;
argv += ret;
force_quit = false;
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
if (port_init(portid, mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", portid);
struct rte_ring *rx_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
if (rx_ring == NULL)
rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
struct rte_ring *tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
if (tx_ring == NULL)
rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
struct lcore_params p;
p.rx_ring = rx_ring;
p.tx_ring = tx_ring;
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
if(lcore_id == 1)
rte_eal_remote_launch((lcore_function_t*)lcore_tx, (void*)tx_ring, lcore_id);
else
rte_eal_remote_launch((lcore_function_t*)lcore_worker, (void*)&p, lcore_id);
}
lcore_rx(rx_ring);
rte_eal_mp_wait_lcore();
return 0;
}
接收数据包
0号lcore运行rx线程,其负责从网络接口接收DNS查询请求的UDP数据包。将接收到的数据包放入环形队列(rx_ring)中,以便后续处理模块消费。
static int lcore_rx(struct rte_ring *rx_ring){
uint16_t port;
uint16_t nb_rx, nb_tx;
uint16_t total=0;
struct rte_mbuf *bufs[BURST_SIZE];
// 检查端口和轮询线程是否位于相同的NUMA节点
if (rte_eth_dev_socket_id(port) > 0 &&
rte_eth_dev_socket_id(port) !=
(int)rte_socket_id())
printf("WARNING, port %u is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n", port);
printf("\nCore %u doing packet RX.\n", rte_lcore_id());
port=0;
uint32_t rx_queue_drop_packets = 0;
while(!force_quit){
nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);
total+=nb_rx;
nb_tx = rte_ring_enqueue_burst(rx_ring, (void *)bufs, nb_rx, NULL);
if (unlikely(nb_tx < nb_rx)){
rx_queue_drop_packets+=nb_rx-nb_tx; // 丢包
while (nb_tx < nb_rx) {
rte_pktmbuf_free(bufs[nb_tx++]);
}
}
}
printf("rx queue enqeue packet number: %d\n",total);
printf("rx queue drop packet number: %d\n", rx_queue_drop_packets);
}
DNS请求处理
使用核心2和3,从 rx_ring 中取包,解析并处理DNS查询,生成响应后将其放入 tx_ring。
static int lcore_worker(struct lcore_params *p)
{
uint16_t nb_rx, nb_tx;
struct rte_mbuf *query_buf[PROCESS_SIZE], *reply_buf[PROCESS_SIZE];
struct rte_ring *in_ring = p->rx_ring; // 输入环形队列
struct rte_ring *out_ring = p->tx_ring; // 输出环形队列
uint8_t *buffer; // 指向数据部分的指针
struct Message msg; // 用于存储 DNS 消息的结构体
memset(&msg, 0, sizeof(struct Message)); // 初始化消息结构体为0
printf("\nCore %u doing packet processing.\n", rte_lcore_id());
uint16_t tx_queue_drop_packets = 0; // 用于统计传输队列中丢包的数量
uint16_t total_dns_packet=0;
while (!force_quit) {
for(uint16_t i = 0; i < PROCESS_SIZE; i++){
do{
reply_buf[i] = rte_pktmbuf_alloc(mbuf_pool); // 分配 mbuf 内存,如果失败则重试
}while(reply_buf[i] == NULL);
}
// 从输入环形队列中取出批量查询包
nb_rx = rte_ring_dequeue_burst(in_ring,(void *)query_buf, PROCESS_SIZE, NULL);
// 如果没有接收到包,释放刚刚分配的回复包内存,继续下一次循环
if (unlikely(nb_rx == 0)){
for(uint16_t i = 0; i < PROCESS_SIZE; i++)
rte_pktmbuf_free(reply_buf[i]);
continue;
}
uint16_t nb_tx_prepare = 0; // 用于统计准备好发送的回复包数量
for(uint16_t i = 0; i < nb_rx; i++){
free_questions(msg.questions);
free_resource_records(msg.answers);
free_resource_records(msg.authorities);
free_resource_records(msg.additionals);
memset(&msg, 0, sizeof(struct Message));
struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(query_buf[i], struct rte_ether_hdr *);
if(*rte_pktmbuf_mtod_offset(query_buf[i], uint16_t*, 36) != rte_cpu_to_be_16(9000)){
continue;
}
buffer = rte_pktmbuf_mtod_offset(query_buf[i], uint8_t*, 42);
if (decode_msg(&msg, buffer, query_buf[i]->data_len - 42) != 0) {
continue;
}
resolver_process(&msg);
rte_pktmbuf_append(reply_buf[nb_tx_prepare], sizeof(struct rte_ether_hdr));
rte_pktmbuf_append(reply_buf[nb_tx_prepare], sizeof(struct rte_ipv4_hdr));
rte_pktmbuf_append(reply_buf[nb_tx_prepare], sizeof(struct rte_udp_hdr));
uint8_t *p = buffer;
if (encode_msg(&msg, &p) != 0) {
continue;
}
uint32_t buflen = p - buffer;
char * payload = (char*)rte_pktmbuf_append(reply_buf[nb_tx_prepare], buflen);
rte_memcpy(payload, buffer, buflen);
build_packet(rte_pktmbuf_mtod_offset(query_buf[i], char*, 0), rte_pktmbuf_mtod_offset(reply_buf[nb_tx_prepare], char*, 0), buflen);
nb_tx_prepare++;
}
nb_tx = rte_ring_enqueue_burst(out_ring, (void *)reply_buf, nb_tx_prepare, NULL);
total_dns_packet+=nb_tx;
for(uint16_t i = 0; i < nb_rx; i++)
rte_pktmbuf_free(query_buf[i]);
for(uint16_t i = nb_tx; i < nb_tx_prepare; i++){
tx_queue_drop_packets += 1; // 统计未成功发送的回复包数量
rte_pktmbuf_free(reply_buf[i]); // 释放未成功发送的回复包
}
}
printf("core %d: tx queue drop packet number: %d\n", rte_lcore_id(), tx_queue_drop_packets);
printf("total sent dns packet is %d\n",total_dns_packet);
return 0;
}
转发数据包
负责从发送队列(tx_ring)中获取已经生成的DNS响应包,并通过网卡将其发送回客户端。
static int lcore_tx(struct rte_ring *tx_ring)
{
uint16_t port = 0;
uint16_t nb_rx, nb_tx;
struct rte_mbuf *bufs[BURST_SIZE];
printf("\nCore %u doing packet TX.\n", rte_lcore_id());
uint16_t dpdk_send_ring_drop_packets = 0;
uint16_t total_sent = 0;
while (!force_quit) {
nb_rx = rte_ring_dequeue_burst(tx_ring, (void *)bufs, BURST_SIZE, NULL);
nb_tx = rte_eth_tx_burst(port, 0, bufs, nb_rx);
total_sent += nb_tx;
if(unlikely(nb_tx < nb_rx)){
dpdk_send_ring_drop_packets += nb_rx - nb_tx;
while(nb_tx < nb_rx){
rte_pktmbuf_free(bufs[nb_tx++]);
}
}
}
printf("dpdk send ring drop packet numbers: %d, total sent number: %d\n", dpdk_send_ring_drop_packets, total_sent);
return 0;
}
测试结果
自定义一个域名和IP作为测试程序
uint8_t ip[4] = {192, 168, 1, 1};
add_A_record("foo.bar.com",ip);
使用以下代码完成DNS发包
from scapy.all import Ether, IP, UDP, DNS, DNSQR, sendp, RandShort, get_if_list
# 设置目标IP和端口
target_ip = "192.168.131.153"
target_port = 9000
# 源IP和MAC地址(修改为VMnet8的地址)
source_ip = "192.168.131.1" # VMnet8 网卡的 IP 地址
source_mac = "00:50:56:C0:00:08" # VMnet8 网卡的 MAC 地址
# 构造DNS查询包
dns_query = DNS(rd=1, qd=DNSQR(qname="foo.bar.com", qtype="A"))
ip_packet = IP(src=source_ip, dst=target_ip)
udp_packet = UDP(sport=RandShort(), dport=target_port)
packet = ip_packet / udp_packet / dns_query
# 发送包,目的MAC地址设置为指定的以太网地址
sendp(Ether(src=source_mac, dst="00:0c:29:00:04:4d") / packet,iface="VMware Network Adapter VMnet8")
wireshark 抓包分析如下: