一、开发思路分析
我们使用ebpf 监控mysql的话有两个思路去做这件事情
1、kprobe -> hook 掉tcp_sendmsg 和 tcp_recvmsg 一类的内核函数去分析网络协议
2、uprobe -> hook 掉 mysqld 的api函数,然后在此基础上进行统计
我使用的是uprobe 去hook 掉mysql内核里的函数,走这条路有一些问题需要考虑清楚:
1、uprobe hook 掉主机的mysqld 二进制,如果有多个不同版本的mysqld 该如何识别设计发现二进制呢?
2、如何监控mysqld的进程的启动和停止?从而注册和解除二进制的hook
3、mysqld 我们需要uprobe 哪些函数?
4、ebpf 是c 语言的程序,mysqld是使用c++ 完成的程序,我们如何用c语言获取c++的class里面的成员?如果c++ 里面class成员顺序发生变动会导致class 偏移量发生变动,我们如何规避这个风险?
5、ebpf 栈只有512个字节,如果一个sql语句很长的话我们该如何处理?
6、mysqld 的网络连接如何转化为linux内核socket的五元组,我们需要去hook linux内核中的哪些函数才能让fd 和 sock 五元组形成对应关系,从而实现链路监控
7. hook 掉相关函数之后对mysql 性能会造成什么影响(待测试)
8.mysqld hook 函数稳定性,因为mysqld 的开发版本不断迭代,如果是大点公司的架构组这方面顾虑小,因为大家都使用一个版本,如果是做监控的公司,那么需要做版本的兼容,那么就需要弄清楚兼容的版本(待测试)
二、问题解决梳理
1、设计和实现watcher程序,负责监听程序的运行和终止
这个watcher的程序设计主要是针对第一和第二两个问题
strace -o test.data mysqld
查看mysqld启动调用过程:
execve("/usr/sbin/mysqld", ["mysqld"], 0x7ffc619fbdf0 /* 58 vars */) = 0
brk(NULL) = 0x5556e960e000
arch_prctl(0x3001 /* ARCH_??? */, 0x7ffef5e4db20) = -1 EINVAL (无效的参数)
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f56d32d2000
access("/etc/ld.so.preload", R_OK) = -1 ENOENT (没有那个文件或目录)
openat(AT_FDCWD, "/etc/ld.so.cache", O_RDONLY|O_CLOEXEC) = 3
newfstatat(3, "", {st_mode=S_IFREG|0644, st_size=117637, ...}, AT_EMPTY_PATH) = 0
mysqld 退出
linux 进程启动调用的是
munmap(0x7f56c781b000, 2625536) = 0
munmap(0x7f56c759a000, 2625536) = 0
munmap(0x7f56c7319000, 2625536) = 0
exit_group(1) = ?
确认系统调用hook点,如果系统调用的话需要tracepoint点:
sudo bpftrace -l "tracepoint:*:sys_enter_execve"
zhanglei@zhanglei-HP-ZHAN-66-Pro-14-inch-G5-Notebook-PC:~$ sudo bpftrace -l "tracepoint:*:sys_enter_exit_group"
tracepoint:syscalls:sys_enter_exit_group
使用ebpf hook 掉这两个钩子
SEC("tracepoint/syscalls/sys_enter_execve")
int tracepoint__syscalls__sys_enter_execve(struct trace_event_raw_sys_enter* ctx) {
return 0;
}
SEC("tracepoint/syscalls/sys_enter_exit_group")
int tracepoint__syscalls__sys_enter_exit_group(struct trace_event_raw_sys_enter* ctx) {
return 0;
}
为此我设计了mysqldWatcher 用来负责发现和解除对mysqld 二进制的hook 总体设计思路如下
2、mysqld我们需要hook哪些函数
mysql 源码分析:
->mysqld_main
-> my_init 初始化mysql 系统lib以及pthreads
my_thread_global_init 初始化mysql thread属性
-->init_variable_default_paths 初始化一些默认文件路径,比如说mysql配置文件my.cnf
创建server
mysqld_socket_acceptor->check_and_spawn_admin_connection_handler_thread();
mysqld_socket_acceptor->connection_event_loop();
---> connection_event_loop
process_new_connection 处理一个新的链接
CMake Warning at cmake/sasl.cmake:264 (MESSAGE):
Could not find SASL
----> m_connection_handler->add_connection(channel_info)
Connection_handler *connection_handler = nullptr;
switch (Connection_handler_manager::thread_handling) {
case SCHEDULER_ONE_THREAD_PER_CONNECTION:
connection_handler = new (std::nothrow) Per_thread_connection_handler();
break;
case SCHEDULER_NO_THREADS:
connection_handler = new (std::nothrow) One_thread_connection_handler();
break;
default:
assert(false);
}
默认是 SCHEDULER_ONE_THREAD_PER_CONNECTION
创建一个新的线程去处理
------> One_thread_connection_handler::add_connection
-------> do_command
--------> dispatch_command
---------> mysql_execute_command(thd);执行查询
我们选一个hook 点的标准应该选定的标准
1、插桩点要满足基本功能点,可以统计sql 语句并且统计耗时
2、插桩点 要尽量稳定,争取在多个版本中参数 和函数名字不发生变化
3、插桩点的参数size 尽量要小,我们需要的参数要尽量排在前面,这样如果参数变动不会导致我们计算偏移量发生变化
比如
class A {
int a;
int c;
}
我们需要获取变量c,其实只需要
*A + sizeof(int) 就可以了
如果发生变化
class A {
int a;
int b;
int c;
}
我们就需要对获取c的变量方式进行调整,因为偏移量发生变化了
*A + sizeof(int) * 2
为此我观看了10几个mysql 源码版本,定位在这些函数上可以了
定位方法,使用bpftrace,
sudo bpftrace -l "uprobe:/usr/sbin/mysqld:*"|grep dispatch_command
插桩点:
uprobe:/usr/sbin/mysqld:_Z16dispatch_commandP3THDPK8COM_DATA19enum_server_command
选用这个函数的原因:
bool dispatch_command(THD *thd, const COM_DATA *com_data,
enum enum_server_command command) {
}
com_data 里有预处理语句,而且还是一个共用体,最大40个字节,增加一个变量只要不超过40个字节对我们的影响都比较小
union COM_DATA {
COM_INIT_DB_DATA com_init_db;
COM_REFRESH_DATA com_refresh;
COM_KILL_DATA com_kill;
COM_SET_OPTION_DATA com_set_option;
COM_STMT_EXECUTE_DATA com_stmt_execute;
COM_STMT_FETCH_DATA com_stmt_fetch;
COM_STMT_SEND_LONG_DATA_DATA com_stmt_send_long_data;
COM_STMT_PREPARE_DATA com_stmt_prepare;
COM_STMT_CLOSE_DATA com_stmt_close;
COM_STMT_RESET_DATA com_stmt_reset;
COM_QUERY_DATA com_query;
COM_FIELD_LIST_DATA com_field_list;
};
同时mysqld的 fd 参数存放于Net这个结构体中
Net存放在mysqld的THD这个参数中,这个参数很大,如果我们选用它很可能由于版本迭代造成偏移量精度丢失,所以我们需要选择另一个函数
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
我们只要hook这个参数就能通过net 获取到fd,从而跟linux内核中的sock参数关联起来
void my_net_set_read_timeout(NET *net, uint timeout) {
DBUG_TRACE;
DBUG_PRINT("enter", ("timeout: %d", timeout));
net->read_timeout = timeout;
if (net->vio) vio_timeout(net->vio, 0, timeout);
}
my_net_set_read_timeout第一个参数就是NEt
typedef struct NET {
MYSQL_VIO vio;
unsigned char *buff, *buff_end, *write_pos, *read_pos;
my_socket fd; /* For Perl DBI/dbd */
/**
Set if we are doing several queries in one
command ( as in LOAD TABLE ... FROM MASTER ),
and do not want to confuse the client with OK at the wrong time
*/
unsigned long remain_in_buf, length, buf_length, where_b;
unsigned long max_packet, max_packet_size;
unsigned int pkt_nr, compress_pkt_nr;
unsigned int write_timeout, read_timeout, retry_count;
int fcntl;
unsigned int *return_status;
unsigned char reading_or_writing;
unsigned char save_char;
bool compress;
unsigned int last_errno;
unsigned char error;
/** Client library error message buffer. Actually belongs to struct MYSQL. */
char last_error[MYSQL_ERRMSG_SIZE];
/** Client library sqlstate buffer. Set along with the error message. */
char sqlstate[SQLSTATE_LENGTH + 1];
/**
Extension pointer, for the caller private use.
Any program linking with the networking library can use this pointer,
which is handy when private connection specific data needs to be
maintained.
The mysqld server process uses this pointer internally,
to maintain the server internal instrumentation for the connection.
*/
void *extension;
} NET;
fd 位于Net中的结构靠前,比较稳定,而且所有成员还没有
#ifdef的影响,所以我选了这个函数,那么为了确保他运行位置得当那么还需要hook do_command函数
my_net_set_read_timeout 被多个地方调用,我们怎么确保他运行在
do_command之间?使用ebpf的map 做锁
使用ebpf的map做锁
SEC("uprobe/_Z10do_commandP3THD")
int uprobe_mysqld_do_command(struct pt_regs* ctx) {
// 上锁,找到需要的netfd, 后面会执行uprobe_my_net_set_read_timeout
u32 tid = bpf_get_current_pid_tgid();
char lock = 1;
bpf_map_update_elem(&mysqld_comm_lock, &tid, &lock, BPF_NOEXIST);
log_trace("uprobe/_Z10do_commandP3THD\n");
return 0;
}
SEC("uretprobe/_Z10do_commandP3THD")
int uretprobe_mysqld_do_command(struct pt_regs* ctx) {
// 解锁
u32 tid = bpf_get_current_pid_tgid();
bpf_map_delete_elem(&mysqld_comm_lock, &tid);
return 0;
}
SEC("uprobe/_Z23my_net_set_read_timeoutP3NETj")
int uprobe_my_net_set_read_timeout(struct pt_regs* ctx) {
size_t socket_fd = 0;
u32 tid = bpf_get_current_pid_tgid();
NET* net = (NET*)PT_REGS_PARM1(ctx);
if (!net) {
bpf_map_delete_elem(&mysqld_comm_lock, &tid);
return 0;
}
if (bpf_probe_read_user(&socket_fd, sizeof(socket_fd), &net->fd) != 0) {
log_trace("err sockfd:%d\n", socket_fd);
bpf_map_delete_elem(&mysqld_comm_lock, &tid);
return 0;
}
if (socket_fd <= 0) {
log_trace("err sockfd <= 0\n");
bpf_map_delete_elem(&mysqld_comm_lock, &tid);
return 0;
}
//确认是否需要保存fd
char* lock = bpf_map_lookup_elem(&mysqld_comm_lock, &tid);
if (!lock) {
log_trace("lock is null\n");
return 0;
}
bpf_map_update_elem(&mysqld_fd_map, &tid, &socket_fd, BPF_ANY);
bpf_map_delete_elem(&mysqld_comm_lock, &tid);
u32 pid = bpf_get_current_pid_tgid() >> 32;
log_trace("_Z11my_net_initP3NETP3Vio socket_fd:%d;%d\n", socket_fd, pid);
return 0;
}
uprobe/_Z10do_commandP3THD时候上锁
uretprobe/_Z10do_commandP3THD 时候解锁
通过Net结构体获取fd
3.sql 语句很大超过512字节怎么办?
ebpf 的函数栈只有512字节,我们比如要存储1k的sql语句那么该如何做?
提前在栈上申请
#define MYSQLD_SQL_LENGTH 1024
typedef struct COMM_EVENT_STMT_PREPARE_DATA_ {
char query[MYSQLD_SQL_LENGTH];
unsigned int length;
}COM_EVENT_STMT_PREPARE_DATA;
由于ebpf 程序不支持malloc ,但是当我们要存储大字节的变量的时候,我们可以使用
BPF_PERCPU_ARRAY_MAP存储大字节变量
// Store temporary variables to be sent to the user program, because the sql statement of mysqld may be very large, so use map to store it temporarily
BPF_PERCPU_ARRAY_MAP(mysqld_tmp_var, __u32, mysqld_transaction_t, 1)
然后对mysqld 程序里的变量进行转换存储进入map,转换函数
// Store mysql data into map
static void save_mysqld_data(struct pt_regs* ctx) {
u32 map_id = 0;
mysqld_transaction_t* transaction = bpf_map_lookup_elem(&mysqld_tmp_var, &map_id);
if (!transaction) {
log_trace("transaction is null\n");
return;
}
COM_DATA* com_data = (COM_DATA*)PT_REGS_PARM2(ctx);
// Read the command parameters of mysqld
enum_server_command uprobe_command = (enum_server_command)PT_REGS_PARM3(ctx);
log_trace("transaction enum_server_command %d\n", uprobe_command);
// Initialization data
transaction->server_command = uprobe_command;
if (from_com_data_to_event_data(uprobe_command ,com_data, &transaction->comm) == -1) {
return;
}
transaction->start_time = bpf_ktime_get_ns();
transaction->duration = 0;
// thread id
u32 tid = bpf_get_current_pid_tgid();
bpf_map_update_with_telemetry(mysqld_comm_data_storage, &tid, transaction, BPF_NOEXIST);
return;
}
from_com_data_to_event_data 把 mysql 内核里的参数,转化为mysqld_tmp_var的变量,从而写入map,转化函数
static int from_com_data_to_event_data(enum_server_command command, COM_DATA* from, COM_EVENT_DATA* to) {
if (!from) {
return -1;
}
if (!to) {
return -1;
}
switch (command) {
case COM_INIT_DB: {
/*
*typedef struct COM_EVENT_INIT_DB_DATA_ {
const char db_name[MYSQLD_DB_LENGTH];
unsigned long length;
}COM_EVENT_INIT_DB_DATA;
*/
bpf_probe_read_user(&to->com_init_db.length, sizeof(unsigned long), &from->com_init_db.length);
bpf_probe_read_user((void*)&to->com_init_db.db_name, MYSQLD_DB_LENGTH, (void*)&to->com_init_db.db_name);
break;
}
case COM_STMT_EXECUTE: {
/*
typedef struct COMM_EVENT_STMT_EXECUTE_DATA_ {
unsigned long stmt_id;
unsigned long open_cursor;
unsigned long parameter_count;
unsigned char has_new_types;
}COM_EVENT_STMT_EXECUTE_DATA;
*/
bpf_probe_read_user(&to->com_stmt_execute.stmt_id, sizeof(unsigned long), &from->com_stmt_execute.stmt_id);
bpf_probe_read_user(&to->com_stmt_execute.open_cursor, sizeof(unsigned long), &from->com_stmt_execute.open_cursor);
bpf_probe_read_user(&to->com_stmt_execute.parameter_count, sizeof(unsigned long), &from->com_stmt_execute.parameter_count);
bpf_probe_read_user(&to->com_stmt_execute.has_new_types, sizeof(unsigned long), &from->com_stmt_execute.has_new_types);
break;
}
case COM_STMT_FETCH: {
/*
typedef struct COMM_EVENT_STMT_FETCH_DATA_ {
unsigned long stmt_id;
unsigned long num_rows;
}COM_EVENT_STMT_FETCH_DATA;
*/
bpf_probe_read_user(&to->com_stmt_fetch.stmt_id, sizeof(unsigned long), &from->com_stmt_fetch.stmt_id);
bpf_probe_read_user(&to->com_stmt_fetch.num_rows, sizeof(unsigned long), &from->com_stmt_fetch.num_rows);
break;
}
case COM_STMT_SEND_LONG_DATA: {
/**
typedef struct COMM_EVENT_STMT_SEND_LONG_DATA_DATA_ {
unsigned long stmt_id;
unsigned int param_number;
unsigned char longdata[MYSQLD_LONG_DATA_LENGTH];
unsigned long length;
}COM_EVENT_STMT_SEND_LONG_DATA_DATA;
*/
bpf_probe_read_user(&to->com_stmt_send_long_data.stmt_id, sizeof(unsigned long), &from->com_stmt_send_long_data.stmt_id);
bpf_probe_read_user(&to->com_stmt_send_long_data.param_number, sizeof(unsigned int), &from->com_stmt_send_long_data.param_number);
bpf_probe_read_user(&to->com_stmt_send_long_data.length, sizeof(unsigned int), &from->com_stmt_send_long_data.length);
const char *longdata;
bpf_probe_read(&longdata, sizeof(longdata), &from->com_stmt_send_long_data.longdata);
bpf_probe_read_str(&to->com_stmt_send_long_data.longdata, MYSQLD_LONG_DATA_LENGTH, longdata);
break;
}
case COM_STMT_PREPARE: {
/*
typedef struct COMM_EVENT_STMT_PREPARE_DATA_ {
const char query[MYSQLD_SQL_LENGTH];
unsigned int length;
}COM_EVENT_STMT_PREPARE_DATA;
*/
bpf_probe_read_user(&to->com_stmt_prepare.length, sizeof(unsigned long), &from->com_stmt_prepare.length);
const char* sql;
bpf_probe_read(&sql, sizeof(sql), &from->com_stmt_prepare.query);
bpf_probe_read_str(&to->com_stmt_prepare.query, MYSQLD_SQL_LENGTH, sql);
char ____fmt[] = "COM_STMT_PREPARE len:%d; sql is:%s\n"; \
bpf_trace_printk(____fmt, sizeof(____fmt), to->com_stmt_prepare.length, sql);
break;
}
case COM_QUERY: {
/*
typedef struct COM_QUERY_DATA_ {
const char *query;query
unsigned int length;
PS_PARAM *parameters;
unsigned long parameter_count;
}COM_QUERY_DATA;
*/
const char *sql;
bpf_probe_read(&sql, sizeof(sql), &from->com_query.query);
bpf_probe_read_str((void*)&to->com_query.query, MYSQLD_SQL_LENGTH, sql);
bpf_probe_read_user(&to->com_query.length, sizeof(unsigned long), &from->com_query.length);
log_trace("COM_QUERY sql is:%s", sql);
break;
}
case COM_FIELD_LIST: {
/*
typedef struct COMM_EVENT_FIELD_LIST_DATA_ {
unsigned char table_name[MYSQLD_TABLE_LENGTH];
unsigned int table_name_length;
const unsigned char query[MYSQLD_SQL_LENGTH];
unsigned int query_length;
}COM_EVENT_FIELD_LIST_DATA;
*/
const char *table_name;
bpf_probe_read(&table_name, sizeof(table_name), &from->com_field_list.table_name);
bpf_probe_read_str((void*)&to->com_field_list.table_name, MYSQLD_TABLE_LENGTH, table_name);
bpf_probe_read_user(&to->com_field_list.table_name_length, sizeof(unsigned long), &from->com_field_list.table_name_length);
const char *query;
bpf_probe_read_str(&query, sizeof(query), &from->com_field_list.query);
bpf_probe_read_user_str((void*)&to->com_field_list.query, MYSQLD_SQL_LENGTH, query);
bpf_probe_read_user(&to->com_field_list.query_length, sizeof(unsigned long), &from->com_field_list.query_length);
break;
}
case COM_REFRESH: {
/*
typedef struct COM_REFRESH_DATA_ {
unsigned char options;
}COM_REFRESH_DATA;
*/
bpf_probe_read_user(&to->com_refresh.options, sizeof(unsigned char), &from->com_refresh.options);
break;
}
case COM_SET_OPTION: {
/*
typedef struct COM_EVENT_SET_OPTION_DATA_ {
unsigned int opt_command;
}COM_EVENT_SET_OPTION_DATA;
*/
bpf_probe_read_user(&to->com_set_option.opt_command, sizeof(unsigned char), &from->com_set_option.opt_command);
break;
}
default: {
break;
}
}
return 0;
}
4、如何把mysqld 中的fd转化为内核的五元组
hook内核点,后面会细说sockfd_lookup_light的作用:
SEC("kprobe/sockfd_lookup_light")
int kprobe__sockfd_lookup_light(struct pt_regs *ctx) {
int sockfd = (int)PT_REGS_PARM1(ctx);
u64 pid_tgid = bpf_get_current_pid_tgid();
// Check if have already a map entry for this pid_fd_t
// TODO: This lookup eliminates *4* map operations for existing entries
// but can reduce the accuracy of programs relying on socket FDs for
// processes with a lot of FD churn
pid_fd_t key = {
.pid = pid_tgid >> 32,
.fd = sockfd,
};
struct sock **sock = bpf_map_lookup_elem(&sock_by_pid_fd, &key);
if (sock != NULL) {
return 0;
}
bpf_map_update_with_telemetry(sockfd_lookup_args, &pid_tgid, &sockfd, BPF_ANY);
return 0;
}
上文已经说过如何获取mysqlfd,这里已经把fd和sock 关系通过map对应起来了,我们在dispatch_command的ret钩子里发出去就可以了
SEC("uretprobe/_Z16dispatch_commandP3THDPK8COM_DATA19enum_server_command")
int uretprobe_mysqld_dispatch_command(struct pt_regs* ctx) {
u32 tid = bpf_get_current_pid_tgid();
mysqld_transaction_t *transaction = bpf_map_lookup_elem(&mysqld_comm_data_storage, &tid);
if (transaction == NULL) {
log_trace("transaction is null\n");
return 0;
}
// 从fd 集合里拿到sockfd
int* fd = bpf_map_lookup_elem(&mysqld_fd_map, &tid);
if (fd == NULL) {
bpf_map_delete_elem(&mysqld_comm_data_storage, &tid);
return 0;
}
// 获取conn tuple数据
u64 pid_tgid = bpf_get_current_pid_tgid();
pid_fd_t pid_fd = {
.pid = pid_tgid >> 32,
.fd = *fd,
};
struct sock **sock = bpf_map_lookup_elem(&sock_by_pid_fd, &pid_fd);
if (sock == NULL) {
bpf_map_delete_elem(&mysqld_comm_data_storage, &tid);
log_trace("sock is null\n");
return 0;
}
if (!read_conn_tuple(&transaction->tup, *sock, pid_tgid, CONN_TYPE_TCP)) {
bpf_map_delete_elem(&mysqld_comm_data_storage, &tid);
return 0;
}
transaction->duration = bpf_ktime_get_ns() - transaction->start_time;
bpf_perf_event_output(ctx, &mysqld_transaction_events, bpf_get_smp_processor_id(), transaction, sizeof(mysqld_transaction_t));
bpf_map_delete_elem(&mysqld_comm_data_storage, &tid);
return 0;
}