前言
上篇文章我们已经初步使用kprobe来探测内核函数了, 这篇文章就在上篇文章的基础上做个修改, 通过kprobe探测内核函数tcp_sendmsg来统计tcp服务端的发送流量. 废话不多说, 直接上正文.
环境
tcp服务端运行在ubuntu22, 监听端口为6230, 其内核为5.19.0-26-generic, ebpf程序同样运行在ubuntu22.
tcp客户端运行在centos7, 其内核为3.10.0-1160.el7.x86_64.
代码
ebpf代码同样分为两个文件, 一个是内核态代码, 探测内核函数, 并把相关信息写入map, 在本文中是tcp服务端发送的流量大小(Byte). 一个是用户态代码, 定时读取map并打印流量大小(Byte).
for_flow_kern.c代码如下:
// for_flow_kern.c
#include <linux/skbuff.h>
#include <linux/netdevice.h>
#include <linux/version.h>
#include <uapi/linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
#include <net/sock.h>
#include "trace_common.h"
#define MAX_ENTRIES 64
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, u64);
__type(value, u32);
__uint(max_entries, MAX_ENTRIES);
} pid_2_port_map SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, u64);
__type(value, u64);
__uint(max_entries, MAX_ENTRIES);
} pid_2_flow_map SEC(".maps");
SEC("kprobe/tcp_sendmsg")
int trace_sys_send(struct pt_regs *ctx)
{
int ret = 0;
u16 family = 0;
struct sock *sock = (struct sock *)PT_REGS_PARM1_CORE(ctx);
if ((ret = bpf_probe_read_kernel(&family, sizeof(family), &sock->sk_family)))
{
return 0;
}
if (family != AF_INET)
{
return 0;
}
u16 port_tmp = 0;
if ((ret = bpf_probe_read_kernel(&port_tmp, sizeof(port_tmp), &sock->sk_num)))
{
return 0;
}
if (port_tmp == 6230)
{
u64 pid = bpf_get_current_pid_tgid();
u32 port = port_tmp;
bpf_map_update_elem(&pid_2_port_map, (const void *)&pid, &port, BPF_ANY);
}
return 0;
}
SEC("kretprobe/tcp_sendmsg")
int trace_sys_send_ret(struct pt_regs *ctx)
{
int ret = 0;
u64 pid = bpf_get_current_pid_tgid();
// 获取pid
u32 *value_ptr = bpf_map_lookup_elem(&pid_2_port_map, &pid);
if (!value_ptr)
{
return 0;
}
// 获取tcp_sendmsg返回值
int size = PT_REGS_RC(ctx);
if (size > 0)
{
// 查找flow
u64 *flow_ptr = bpf_map_lookup_elem(&pid_2_flow_map, &pid);
u64 sum = flow_ptr == NULL ? (0 + size) : (*flow_ptr + size);
bpf_map_update_elem(&pid_2_flow_map, &pid, &sum, BPF_ANY);
}
return 0;
}
char _license[] SEC("license") = "GPL";
u32 _version SEC("version") = LINUX_VERSION_CODE;
for_flow_user.c代码如下:
// for_flow_user.c
#define _GNU_SOURCE
#include <sched.h>
#include <stdio.h>
#include <sys/types.h>
#include <asm/unistd.h>
#include <unistd.h>
#include <assert.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include <arpa/inet.h>
#include <errno.h>
#include <bpf/bpf.h>
#include <bpf/libbpf.h>
int main(int argc, char **argv)
{
struct bpf_object *obj;
int i = 0;
int mfd1 = 0;
struct bpf_link *link1 = NULL;
struct bpf_program *prog1;
int mfd2 = 0;
struct bpf_link *link2 = NULL;
struct bpf_program *prog2;
char filename[256];
snprintf(filename, sizeof(filename), "%s_kern.o", argv[0]);
// obj
obj = bpf_object__open_file(filename, NULL);
if (libbpf_get_error(obj))
{
fprintf(stderr, "ERROR: opening BPF object file failed\n");
return -1;
}
if (bpf_object__load(obj))
{
fprintf(stderr, "ERROR: loading BPF object file failed\n");
goto END;
}
// ------------- //
prog1 = bpf_object__find_program_by_name(obj, "trace_sys_send");
if (!prog1)
{
printf("finding a prog1 in obj file failed\n");
goto END;
}
prog2 = bpf_object__find_program_by_name(obj, "trace_sys_send_ret");
if (!prog2)
{
printf("finding a prog2 in obj file failed\n");
goto END;
}
// ------------- //
mfd1 = bpf_object__find_map_fd_by_name(obj, "pid_2_port_map");
if (mfd1 < 0)
{
fprintf(stderr, "ERROR: finding a map mfd1 in obj file failed\n");
goto END;
}
mfd2 = bpf_object__find_map_fd_by_name(obj, "pid_2_flow_map");
if (mfd2 < 0)
{
fprintf(stderr, "ERROR: finding a map mfd2 in obj file failed\n");
goto END;
}
// ------------- //
link1 = bpf_program__attach(prog1);
if (libbpf_get_error(link1))
{
fprintf(stderr, "ERROR: bpf_program__attach link1 failed\n");
link1 = NULL;
goto END;
}
link2 = bpf_program__attach(prog2);
if (libbpf_get_error(link2))
{
fprintf(stderr, "ERROR: bpf_program__attach link2 failed\n");
link2 = NULL;
goto END;
}
for (i = 0; i < 1000; i++)
{
unsigned long long key = 0;
unsigned long long next_key = 0;
int j = 0;
while (bpf_map_get_next_key(mfd2, &key, &next_key) == 0)
{
unsigned int value = 0;
bpf_map_lookup_elem(mfd1, &next_key, &value);
fprintf(stdout, "pid%d: %llu, flow: %u\n", ++j, next_key, value);
key = next_key;
}
key = 0;
next_key = 0;
j = 0;
while (bpf_map_get_next_key(mfd2, &key, &next_key) == 0)
{
unsigned long long value = 0;
bpf_map_lookup_elem(mfd2, &next_key, &value);
fprintf(stdout, "pid%d: %llu, flow: %llu\n", ++j, next_key, value);
key = next_key;
}
printf("-----------------------\n");
sleep(2);
}
END:
bpf_link__destroy(link1);
bpf_link__destroy(link2);
bpf_object__close(obj);
return 0;
}
把for_flow_kern.c和for_flow_user.c两个文件加入Makefile后编译.
在这里提供一个自己编写的tcp客户端和服务端测试程序, 可能会有bug, 但是应付这个测试是没问题的. 客户端和服务端互相通信, 客户端每次写1024字节, 每次读512字节. 服务端每次读1024字节, 每次写512字节. 代码如下:
// tcp_test.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <signal.h>
#include <sys/ioctl.h>
#include <linux/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#define EPOLL_SIZE 1024
#define PARALLEL_MAX 16
#define READ_SIZE 1024
#define WRITE_SIZE 512
#define SET_NONBLOCK(fd) ({fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);})
#define EPOLL_ET_CTL(node, op, event) \
({\
struct epoll_event ev = {0x00}; \
ev.events = (event) | EPOLLET; \
ev.data.ptr = (node); \
epoll_ctl((node)->ep_fd, (op), (node)->fd, &ev); \
})
#define EPOLL_ET_DEL(node) \
({\
epoll_ctl((node)->ep_fd, EPOLL_CTL_DEL, (node)->fd, NULL); \
})
/*
* 命令行参数
*/
typedef struct config
{
int mode;
char *addr;
unsigned short int port;
int parallel;
struct sockaddr_in addr_in;
socklen_t addr_in_len;
}config_t;
/*
* epoll树的节点
*/
typedef struct ep_node
{
struct sockaddr_in addr_in;
int ep_fd;
int sk_fd;
int fd;
long long r_sum;
long long w_sum;
}ep_node_t;
typedef struct ep_instance
{
config_t *conf;
int position; // 0代表主干线程
int ep_fd; // epoll树节点
union
{
int sk_fd; // 主干epoll维护的socket
int r_pipe; // 分支epoll维护的读pipe, 与w_pipes对应
};
int w_pipes[PARALLEL_MAX]; // 主干线程维护的各子线程pipe的写端
long long count;
}ep_instance_t;
typedef struct fd_and_addr
{
int fd;
struct sockaddr_in addr_in;
}fd_and_addr_t;
static int tcp_socket(const config_t *conf)
{
int sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
{
printf("socket failed, err msg: %s\n", strerror(errno));
return -1;
}
int val1 = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR | (conf->mode == 0 ? SO_REUSEPORT : 0), (void *)&val1, sizeof(val1)) == -1)
{
printf("setsockopt failed, err msg: %s\n", strerror(errno));
goto FAILED;
}
if (conf->mode == 0)
{
if (bind(sfd, (struct sockaddr*)&conf->addr_in, conf->addr_in_len) == -1)
{
printf("bind failed, err msg: %s\n", strerror(errno));
goto FAILED;
}
if (listen(sfd, 1024))
{
printf("bind failed, err msg: %s\n", strerror(errno));
goto FAILED;
}
}
else
{
if (connect(sfd, (struct sockaddr*)&conf->addr_in, conf->addr_in_len))
{
printf("connect failed, err msg: %s\n", strerror(errno));
goto FAILED;
}
}
int val2 = 1;
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&val2, sizeof(val2)) == -1)
{
printf("setsockopt failed, err msg: %s\n", strerror(errno));
goto FAILED;
}
SET_NONBLOCK(sfd);
return sfd;
FAILED:
close(sfd);
return -1;
}
ep_node_t *accept_cb(ep_instance_t *ins, ep_node_t *node)
{
ep_node_t *new_node = (ep_node_t *)malloc(sizeof(ep_node_t));
if (!new_node)
{
return NULL;
}
fd_and_addr_t addr;
if (ins->position == 0)
{
socklen_t remote_len = sizeof(addr.addr_in);
addr.fd = accept(ins->sk_fd, (struct sockaddr *)&addr.addr_in, &remote_len);
if (addr.fd == -1)
{
goto FREE;
}
int index = ins->count++ % ins->conf->parallel;
if (index != 0)
{
write(((int *)(ins->w_pipes))[index], (void *)&addr, sizeof(addr));
return NULL;
}
}
else
{
int ret = read(ins->r_pipe, &addr, sizeof(addr));
if (ret != sizeof(addr))
{
goto CLOSE;
}
}
SET_NONBLOCK(addr.fd);
new_node->addr_in = addr.addr_in;
new_node->ep_fd = ins->ep_fd;
new_node->sk_fd = ins->position == 0 ? ins->sk_fd : ins->r_pipe;
new_node->fd = addr.fd;
return new_node;
CLOSE:
close(addr.fd);
FREE:
free(new_node);
new_node = NULL;
return new_node;
}
static int server_read_cb(ep_node_t *node)
{
unsigned char read_data[READ_SIZE];
memset(read_data, 0x00, READ_SIZE);
int ret = read(node->fd, read_data, READ_SIZE);
if (ret > 0)
{
node->r_sum += ret;
EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);
}
else if (ret <= 0)
{
close(node->fd);
EPOLL_ET_DEL(node);
free(node);
node = NULL;
}
return ret;
}
static int server_write_cb(ep_node_t *node)
{
unsigned char write_data[WRITE_SIZE];
memset(write_data, 0x30, WRITE_SIZE);
int ret = write(node->fd, write_data, WRITE_SIZE);
if (ret >= 0)
{
node->w_sum += ret;
EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);
}
else
{
printf("write finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);
close(node->fd);
EPOLL_ET_DEL(node);
free(node);
node = NULL;
}
return 0;
}
void *tcp_server_process(void *arg)
{
ep_instance_t *ins = (ep_instance_t *)arg;
if (!ins)
{
return NULL;
}
ins->ep_fd = epoll_create(EPOLL_SIZE);
if (ins->ep_fd == -1)
{
return NULL;
}
int sk_fd = ins->position == 0 ? ins->sk_fd : ins->r_pipe;
ep_node_t sk_fd_node = {
.ep_fd = ins->ep_fd,
.sk_fd = sk_fd,
.fd = sk_fd,
};
if (EPOLL_ET_CTL(&sk_fd_node, EPOLL_CTL_ADD, EPOLLIN))
{
return NULL;
}
struct epoll_event active_events[EPOLL_SIZE + 1];
memset(&active_events, 0x00, sizeof(active_events));
int i = 0;
for(;;)
{
int active = epoll_wait(ins->ep_fd, active_events, EPOLL_SIZE + 1, -1);
for (i = 0; i < active; i++)
{
ep_node_t *node = (ep_node_t *)active_events[i].data.ptr;
// 新连接
if (node->fd == node->sk_fd)
{
ep_node_t *new_node = accept_cb(ins, node);
if (new_node)
{
if (EPOLL_ET_CTL(new_node, EPOLL_CTL_ADD, EPOLLIN))
{
close(new_node->fd);
free(new_node);
new_node = NULL;
}
else
{
printf("pos: %d, fd: %d, remote ip: %s, remote port: %d\n", ins->position, new_node->fd,
inet_ntoa(new_node->addr_in.sin_addr), ntohs(new_node->addr_in.sin_port));
}
}
EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);
}
// 读事件
else if (active_events[i].events & EPOLLIN)
{
unsigned char read_data[READ_SIZE];
memset(read_data, 0x00, READ_SIZE);
int ret = read(node->fd, read_data, READ_SIZE);
if (ret <= 0)
{
printf("peer closed or read failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);
close(node->fd);
EPOLL_ET_DEL(node);
free(node);
node = NULL;
continue;
}
node->r_sum += ret;
EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);
}
// 写事件
else if (active_events[i].events & EPOLLOUT)
{
unsigned char write_data[WRITE_SIZE];
memset(write_data, 0x39, WRITE_SIZE);
int ret = write(node->fd, write_data, WRITE_SIZE);
if (ret < 0)
{
printf("write failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);
close(node->fd);
EPOLL_ET_DEL(node);
free(node);
node = NULL;
continue;
}
node->w_sum += ret;
EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);
}
}
}
return NULL;
}
int tcp_server(config_t *conf)
{
int i = 0, tmp_sk_fd = 0;
if ((tmp_sk_fd = tcp_socket(conf)) < 0)
{
return -1;
}
int tmp_pipes_fd[PARALLEL_MAX][2];
memset(tmp_pipes_fd, 0x00, sizeof(tmp_pipes_fd));
ep_instance_t tmp_ins_arr[PARALLEL_MAX];
memset(tmp_ins_arr, 0x00, sizeof(tmp_ins_arr));
int tmp_w_pipes[PARALLEL_MAX];
memset(tmp_w_pipes, 0x00, sizeof(tmp_w_pipes));
pthread_t pids[PARALLEL_MAX];
memset(pids, 0x00, sizeof(pids));
for (i = 1; i < conf->parallel; i++)
{
pipe(tmp_pipes_fd[i]);
SET_NONBLOCK(tmp_pipes_fd[i][0]);
SET_NONBLOCK(tmp_pipes_fd[i][1]);
tmp_ins_arr[i].conf = conf;
tmp_ins_arr[i].position = i;
tmp_ins_arr[i].r_pipe = tmp_pipes_fd[i][0];
tmp_w_pipes[i] = tmp_pipes_fd[i][1];
pthread_create(&pids[i], NULL, tcp_server_process, (void *)&tmp_ins_arr[i]);
}
tmp_ins_arr[0].conf = conf;
tmp_ins_arr[0].position = 0;
tmp_ins_arr[0].sk_fd = tmp_sk_fd;
memcpy(tmp_ins_arr[0].w_pipes, tmp_w_pipes, sizeof(tmp_w_pipes));
tcp_server_process((void *)&tmp_ins_arr[0]);
for (i = 1; i < conf->parallel; i++)
{
pthread_join(pids[i], NULL);
}
return 0;
}
void* tcp_client(void *arg)
{
config_t *conf = (config_t *)arg;
ep_node_t fd_node;
memset(&fd_node, 0x00, sizeof(fd_node));
int ep_fd = epoll_create(EPOLL_SIZE);
if (ep_fd == -1)
{
return NULL;
}
int fd = tcp_socket(conf);
if (fd < 0)
{
return NULL;
}
fd_node.ep_fd = ep_fd;
fd_node.fd = fd;
if (EPOLL_ET_CTL(&fd_node, EPOLL_CTL_ADD, EPOLLOUT))
{
return NULL;
}
struct epoll_event active_events[EPOLL_SIZE + 1];
memset(&active_events, 0x00, sizeof(active_events));
int i = 0;
for(;;)
{
int active = epoll_wait(ep_fd, active_events, EPOLL_SIZE + 1, -1);
for (i = 0; i < active; i++)
{
ep_node_t *node = (ep_node_t *)active_events[i].data.ptr;
if (active_events[i].events & EPOLLIN)
{
unsigned char read_data[WRITE_SIZE];
memset(read_data, 0x00, WRITE_SIZE);
int ret = read(node->fd, read_data, WRITE_SIZE);
if (ret <= 0)
{
printf("peer closed or read failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);
close(node->fd);
EPOLL_ET_DEL(node);
continue;
}
node->r_sum += ret;
if (node->r_sum >= 1024 * 1024 * 512 && node->w_sum >= 1024 * 1024 * 1024)
{
printf("rw finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);
close(node->fd);
EPOLL_ET_DEL(node);
return NULL;
}
EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);
}
else if (active_events[i].events & EPOLLOUT)
{
unsigned char write_data[READ_SIZE];
memset(write_data, 0x39, READ_SIZE);
int ret = write(node->fd, write_data, READ_SIZE);
if (ret < 0)
{
printf("write failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);
close(node->fd);
EPOLL_ET_DEL(node);
continue;
}
node->w_sum += ret;
if (node->r_sum >= 1024 * 1024 * 512 && node->w_sum >= 1024 * 1024 * 1024)
{
printf("rw finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);
close(node->fd);
EPOLL_ET_DEL(node);
return NULL;
}
EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);
}
}
}
return 0;
}
void sig_cb(int sig)
{
printf("capture sig: %d\n", sig);
}
int main(int argc, char *argv[])
{
if (argc != 5)
{
printf("Argc failed\n");
return -1;
}
signal(SIGPIPE, sig_cb);
config_t conf = {
.mode = atoi(argv[1]),
.addr = argv[2],
.port = atoi(argv[3]),
.parallel = atoi(argv[4]) > PARALLEL_MAX ? PARALLEL_MAX : atoi(argv[4]),
.addr_in.sin_family = AF_INET,
.addr_in.sin_addr.s_addr = inet_addr(argv[2]),
.addr_in.sin_port = htons(atoi(argv[3])),
.addr_in_len = sizeof(struct sockaddr_in),
};
int i = 0;
if (conf.mode == 0)
{
tcp_server(&conf);
}
else
{
pthread_t pids[PARALLEL_MAX];
for (i = 1; i < conf.parallel; i++)
{
pthread_create(&pids[i], NULL, tcp_client, &conf);
}
tcp_client(&conf);
for (i = 1; i < conf.parallel; i++)
{
pthread_join(pids[i], NULL);
}
printf("after pthread_join\n");
}
return 0;
}
编译命令: gcc tcp_test.c -o tcp_test -lpthread
测试
ubuntu22上启动ebpf程序: ./for_flow
ubuntu22上启动tcp服务端: ./tcp_test 0 192.168.20.11 6230 2 // 0代表服务端, 2代表线程数
centos7上启动tcp客户端: ./tcp_test 1 192.168.20.11 6230 2 // 1代表客户端, 2代表线程数, 且每个线程一个tcp连接
结果
结果入下图:
从结果可以看到, 客户端读取536870912字节(512MB), 服务端写入536870912字节(512MB), tcp测试程序代码里做的判断的也是1024*1024*512字节, 而最后ebpf统计的也是536870912字节(512MB), 结果匹配, ebpf统计tcp服务端写流量成功.
其实这个代码改一改也可以统计tcp服务端读取的字节数, 只要探测函数改成tcp_recvmsg, 其他的逻辑判断再改一改即可. 但是改造的过程中可能要踩一下坑, 我已经改造成功, 就不放出来了. 如果有同学感兴趣可以尝试下. 如果改造的过程中遇到问题, 也可以交流交流.