问题背景:
为了分析udp数据通信中端到端的延迟,我们需要对整个通信链路的每个阶段进行监控,找出延迟最长的阶段.
udp接收端有2个主要路径
1.数据包到达本机后,由软中断处理程序将数据包接收并放入udp socket的接收缓冲区
数据接收流程
2. 应用程序调用recvmsg等api将数据从socket缓冲区读出
应用程序读取数据流程
2和1之间可能由于调度等造成延迟,我们写一个bcc程序对指定接收端口和延迟大于某个值的情况进行监控
bcc程序原理
我们在流程1放入udp缓冲区时(udp_unicast_rcv_skb),记录此skb的时间
然后在流程2读取udp缓冲区时(__skb_recv_udp)时取出1中记录的skb时间,并与当前时间做差值得到延迟.
#!/usr/bin/python3
# @lint-avoid-python-3-compatibility-imports
#
# udplatency Trace long udp recvmsg delays.
# For Linux, uses BCC, eBPF.
#
# This script traces high delays between skb being
# ready to in recv queue and them recvmsg on CPU after that.
#
# USAGE: udplatency [-d dport] [-l lat]
import argparse
import ctypes as ct
from time import strftime
from bcc import BPF
bpf_text = '''
# include <linux/ip.h>
# include <linux/netfilter.h>
# include <net/ip.h>
# include <uapi/linux/bpf.h>
struct data_t {
u64 ts;
u64 lat;
};
BPF_PERF_OUTPUT(events);
BPF_HASH(recv_lat, struct sk_buff*);
int kprobe__udp_unicast_rcv_skb(struct pt_regs *ctx, struct sock *sk, struct sk_buff* skb)
{
struct udphdr *udp_hdr = (struct udphdr *)(skb->head + skb->transport_header);
u16 dst_port = bpf_ntohs(udp_hdr->dest);
if (dst_port == DST_PORT) {
u64 ts = bpf_ktime_get_ns();
recv_lat.update(&skb, &ts);
}
return 0;
};
int kretprobe____skb_recv_udp(struct pt_regs *ctx)
{
struct sk_buff* skb = (struct sk_buff*)PT_REGS_RC(ctx);
struct udphdr *udp_hdr = (struct udphdr *)(skb->head + skb->transport_header);
u16 dst_port = bpf_ntohs(udp_hdr->dest);
if (dst_port == DST_PORT) {
struct data_t data = {};
u64 *tsp = recv_lat.lookup(&skb);
if (tsp != 0) {
Home = bpf_ktime_get_ns() - *tsp;
}
recv_lat.delete(&skb);
if (data.lat >= LAT_NS) {
bpf_probe_read_kernel(&(data.ts), sizeof(*tsp), tsp);
events.perf_submit(ctx, &data, sizeof(data));
}
}
return 0;
}
'''
class EventData(ct.Structure):
_fields_ = [("ts", ct.c_ulonglong),
("lat", ct.c_ulonglong)]
def print_event(cpu, data, size):
event = ct.cast(data, ct.POINTER(EventData)).contents
print("%-8s ts:%d lat: %dms\n" % (strftime("%H:%M:%S"), event.ts, event.lat / 3000000))
parser = argparse.ArgumentParser(
description="Track udp recv latency")
parser.add_argument("-d", "--dport", type=int, required=True,
help="udp dst port")
parser.add_argument("-l", "--lat", type=int,
help="report latency > ns, default is 3000000")
args = parser.parse_args()
lat_ns = 3000000
if args.lat:
lat_ns = args.lat
bpf_text = bpf_text.replace('DST_PORT', str(args.dport))
bpf_text = bpf_text.replace('LAT_NS', str(lat_ns))
# initialize BPF
b = BPF(text=bpf_text)
b["events"].open_perf_buffer(print_event)
while True:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
exit()