Caretta 利用 eBPF 实现 Kubernetes 应用网络拓扑

news2024/11/23 13:17:23

介绍

Caretta 是一种轻量级的独立工具,快速展示集群中运行的服务可视化网络图。

Caretta 利用 eBPF 有效地展示 K8s 集群中的服务网络交互图,并利用 Grafana 查询和可视化收集的数据。科学家们早就知道,海龟和许多动物一样,通过感应磁场中看不见的线在海上航行,类似于水手使用纬度和经度的方式。

说明

内核允许使用 eBPF 的开发人员将他们的程序附加到各种类型的探测器 - 放置在内核或应用程序的代码中,当到达时,将在执行其原始代码之前或之后运行附加到它们的程序。Caretta 利用 eBPF 获取网络数据,通过 Prometheus 汇聚数据,并且通过 grafana 大盘展示。Caretta 是基于 tcplife 的启发,它是一个使用 eBPF 计算 TCP 生命周期统计数据和信息的好工具。

"tcplife" 插件可以用于监视和分析 TCP 连接的各个阶段和事件,比如连接建立、数据传输、连接关闭等。它可以捕获和记录关于 TCP 连接的相关信息,如连接持续时间、数据包统计、延迟等,并提供一些有用的指标和分析结果。这对于网络故障排除、性能优化以及网络安全分析等方面都很有用。关键词:tcp_set_state、tcp_data_queue 等。

为什么 tcp_data_queue 函数?

原理:网络数据收集机制原理 - 探测 tcp_data_queue 以观察网络套接字在其已建立状态下更新其统计信息,并探测 tcp_set_state 以跟踪其生命周期。使用此函数的优点是 tcp_set_state 和大多数内核 TCP 函数一样,它的第一个参数是一个 struct sock 对象。

具有轻量级、高效性、简单化、可视化等显著特点。

原理

有一个 Grafana 实例查询 VictoriaMetrics (caretta-vm) 代理(并在其 Web UI 上显示此地图);

Victoria agent 从 caretta daemonset 中抓取指标;

Victoria 代理和 Caretta 都使用了 Kubernetes 服务公开的 kubernetes API;

下面是 Caretta Agent 发布的时间序列指标示例:

caretta_links_observed{client_id="1074587981",client_kind="Deployment",client_name="checkoutservice",client_namespace="demo-ng",link_id="198768460",role="1",server_id="1112713827",server_kind="Service",server_name="productcatalogservice",server_namespace="demo-ng",server_port="3550"} 2537

在此连接中,我们可以看到 checkoutservice 向名为 productcatalogservice 的服务发送 2537 个字节。请注意,某些标签的生成完全符合 Grafana 期望显示节点图的格式。

Caretta 整体流程如下所示:

案例

前提条件

  • Linux 内核版本 >= 4.16。

  • 支持 CO-RE。可见 CORE,(Compile Once – Run Everywhere)。

Couldn't load probes - error loading BPF objects from go-side. field HandleSockSetState: program handle_sock_set_state: apply CO-RE relocations: load kernel spec: no BTF found for kernel version 3.10.0-1160.83.1.el7.x86_64: not supported

部署应用

 部署 Caretta 组件

helm repo add groundcover https://helm.groundcover.com/

helm repo update

helm install caretta --namespace caretta --create-namespace groundcover/caretta

Caretta 常见指标

Caretta 的 helm chart 使用 Caretta 自带的数据发布了一个带有预定义仪表板的 Grafana 实例。此仪表板包含一些示例来演示 Caretta 指标的用法。

使用提供的 Grafana 实例:

kubectl port-forward --namespace caretta <grafana-pod-name> 3000:3000

抓取 Caretta 的指标:

Caretta 的主要指标是 caretta_links_observed (Gauge)。 它使用以下标签来表示通过集群的特定连接(网络套接字):

  • client_name - kubernetes 实体的名称(如果已解析)、外部域(如果已解析)或 IP 地址。

  • client_namespace - kubernetes 实体的命名空间,或“节点”,或“外部”。

  • client_kind - kubernetes 实体的种类,或“节点”,或“外部”。

  • server_name - kubernetes 实体的名称(如果已解析)、外部域(如果已解析)或 IP 地址。

  • server_namespace - kubernetes 实体的命名空间,或“节点”,或“外部”。

  • server_kind - kubernetes 实体的种类,或“节点”,或“外部”。

  • server_port - 服务器使用的端口。

  • role - 1(客户端)或 2(服务器)。

k8s 应用网络拓扑图如下所示:

 

删除 Caretta 组件

helm delete caretta --namespace caretta

源码

内核 eBPF 代码:

#include "core_structures.h"
#include "arm_support.h"
#include <bpf_core_read.h>
#include <bpf_helpers.h>
#include <bpf_tracing.h>
#include "ebpf_utils.h"
#include "epbf_shared_types.h"
#include "ebpf_internel_types.h"

char __license[] SEC("license") = "Dual MIT/GPL";

// internal kernel-only map to hold state for each sock observed.
struct bpf_map_def SEC("maps") sock_infos = {
    .type = BPF_MAP_TYPE_HASH,
    .key_size = sizeof(struct sock *),
    .value_size = sizeof(struct sock_info),
    .max_entries = MAX_CONNECTIONS,
};

// the main product of the tracing - map containing all connections observed,
// with metadata and throughput stats.
// key is a whole identifier struct and not a single id to split the constant
// and dynamic values and to resemble as closely as possible the end result in
// the userspace code.
struct bpf_map_def SEC("maps") connections = {
    .type = BPF_MAP_TYPE_HASH,
    .key_size = sizeof(struct connection_identifier),
    .value_size = sizeof(struct connection_throughput_stats),
    .max_entries = MAX_CONNECTIONS,
};

// helper to convert short int from BE to LE
static inline u16 be_to_le(__be16 be) { return (be >> 8) | (be << 8); }

static inline u32 get_unique_id() {
  return bpf_ktime_get_ns() % __UINT32_MAX__; // no reason to use 64 bit for this
}

// function for parsing the struct sock
static inline int
parse_sock_data(struct sock *sock, struct connection_tuple *out_tuple,
                struct connection_throughput_stats *out_throughput) {

  if (sock == NULL) {
    return BPF_ERROR;
  }

  // struct sock wraps struct tcp_sock and struct inet_sock as its first member
  struct tcp_sock *tcp = (struct tcp_sock *)sock;
  struct inet_sock *inet = (struct inet_sock *)sock;

  // initialize variables. IP addresses and ports are read originally
  // big-endian, and we will convert the ports to little-endian.
  __be16 src_port_be = 0;
  __be16 dst_port_be = 0;

  // read connection tuple

  if (0 != bpf_core_read(&out_tuple->src_ip, sizeof(out_tuple->src_ip),
                      &inet->inet_saddr)) {
    return BPF_ERROR;
  }

  if (0 != bpf_core_read(&out_tuple->dst_ip, sizeof(out_tuple->dst_ip),
                      &inet->inet_daddr)) {
    return BPF_ERROR;
  }

  if (0 != bpf_core_read(&src_port_be, sizeof(src_port_be), &inet->inet_sport)) {
    return BPF_ERROR;
  }
  out_tuple->src_port = be_to_le(src_port_be);

  if (0 != bpf_core_read(&dst_port_be, sizeof(dst_port_be), &inet->inet_dport)) {
    return BPF_ERROR;
  }
  out_tuple->dst_port = be_to_le(dst_port_be);

  // read throughput data

  if (0 != bpf_core_read(&out_throughput->bytes_received,
                      sizeof(out_throughput->bytes_received),
                      &tcp->bytes_received)) {
    return BPF_ERROR;
  }
  if (0 != bpf_core_read(&out_throughput->bytes_sent,
                      sizeof(out_throughput->bytes_sent), &tcp->bytes_sent)) {
    return BPF_ERROR;
  }

  return BPF_SUCCESS;
};

static inline enum connection_role get_sock_role(struct sock* sock) {
  // the max_ack_backlog holds the limit for the accept queue
  // if it is a server, it will not be 0
  int max_ack_backlog = 0;
  if (0 != bpf_core_read(&max_ack_backlog, sizeof(max_ack_backlog),
                &sock->sk_max_ack_backlog)) {
    return CONNECTION_ROLE_UNKNOWN;
  }

  return max_ack_backlog == 0 ? CONNECTION_ROLE_CLIENT : CONNECTION_ROLE_SERVER;      
}

// probing the tcp_data_queue kernel function, and adding the connection
// observed to the map.
SEC("kprobe/tcp_data_queue")
static int handle_tcp_data_queue(struct pt_regs *ctx) {
  // first argument to tcp_data_queue is a struct sock*
  struct sock *sock = (struct sock *)PT_REGS_PARM1(ctx);

  struct connection_identifier conn_id = {};
  struct connection_throughput_stats throughput = {};

  if (parse_sock_data(sock, &conn_id.tuple, &throughput) == BPF_ERROR) {
    return BPF_ERROR;
  }

  // skip unconnected sockets
  if (conn_id.tuple.dst_port == 0 && conn_id.tuple.dst_ip == BPF_SUCCESS) {
    return BPF_SUCCESS;
  }

  // fill the conn_id extra details from sock_info map entry, or create one
  struct sock_info *sock_info = bpf_map_lookup_elem(&sock_infos, &sock);
  if (sock_info == NULL) {
    // first time we encounter this sock
    // check if server or client and insert to the maps

    enum connection_role role = get_sock_role(sock);

    struct sock_info info = {
        .pid = 0, // can't associate to pid anyway
        .role = role,
        .is_active = true,
        .id = get_unique_id(),
    };
    bpf_map_update_elem(&sock_infos, &sock, &info, BPF_ANY);

    conn_id.pid = info.pid;
    conn_id.id = info.id;
    conn_id.role = info.role;
    throughput.is_active = true;

    bpf_map_update_elem(&connections, &conn_id, &throughput, BPF_ANY);

    return BPF_SUCCESS;
  } 

  conn_id.pid = sock_info->pid;
  conn_id.id = sock_info->id;
  conn_id.role = sock_info->role;
  if (!sock_info->is_active) {
    return -1;
  }
  throughput.is_active = sock_info->is_active; 
  
  bpf_map_update_elem(&connections, &conn_id, &throughput, BPF_ANY);

  return BPF_SUCCESS;
};

static inline int handle_set_tcp_syn_sent(struct sock* sock) {
  // start of a client session
  u32 pid = bpf_get_current_pid_tgid() >> 32;

  struct sock_info info = {
      .pid = pid,
      .role = CONNECTION_ROLE_CLIENT,
      .is_active = true,
      .id = get_unique_id(),
  };

  bpf_map_update_elem(&sock_infos, &sock, &info, BPF_ANY);

  return BPF_SUCCESS;
}

static inline int handle_set_tcp_syn_recv(struct sock* sock) {
  // this is a server getting syn after listen
    struct connection_identifier conn_id = {};
    struct connection_throughput_stats throughput = {};

    if (parse_sock_data(sock, &conn_id.tuple, &throughput) == BPF_ERROR) {
      return BPF_ERROR;
    }

    struct sock_info info = {
        .pid = 0, // can't associate to process
        .role = CONNECTION_ROLE_SERVER,
        .is_active = true,
        .id = get_unique_id(),
    };

    bpf_map_update_elem(&sock_infos, &sock, &info, BPF_ANY);

    // probably the dst ip will still be unitialized
    if (conn_id.tuple.dst_ip == 0) {
      return BPF_SUCCESS;
    }

    conn_id.pid = info.pid;
    conn_id.id = info.id;
    conn_id.role = info.role;

    bpf_map_update_elem(&connections, &conn_id, &throughput, BPF_ANY);

    return BPF_SUCCESS;
}

static inline int handle_set_tcp_close(struct sock* sock) {
  // mark as inactive
  struct connection_identifier conn_id = {};
  struct connection_throughput_stats throughput = {};

  if (parse_sock_data(sock, &conn_id.tuple, &throughput) == BPF_ERROR) {
    return BPF_ERROR;
  }

  struct sock_info *info = bpf_map_lookup_elem(&sock_infos, &sock);
  if (info == NULL) {
    conn_id.id = get_unique_id();
    conn_id.pid = 0; // cannot associate to PID in this state
    conn_id.role = get_sock_role(sock);
  } else {
    conn_id.id = info->id;
    conn_id.pid = info->pid;
    conn_id.role = info->role;
    bpf_map_delete_elem(&sock_infos, &sock);
  }

  throughput.is_active = false;
  bpf_map_update_elem(&connections, &conn_id, &throughput, BPF_ANY);

  return BPF_SUCCESS;
}

SEC("tracepoint/sock/inet_sock_set_state")
static int handle_sock_set_state(struct set_state_args *args) {
  struct sock *sock = (struct sock *)args->skaddr;

  switch(args->newstate) {
    case TCP_SYN_RECV: {
      return handle_set_tcp_syn_recv(sock) == BPF_ERROR;
    }
    case TCP_SYN_SENT: {
      return handle_set_tcp_syn_sent(sock) == BPF_ERROR;
    }
    case TCP_CLOSE:  {
      return handle_set_tcp_close(sock);
    }
  }

  return BPF_SUCCESS;
}

使用 Go 加载 ebpf 代码:

func LoadProbes() (Probes, *ebpf.Map, error) {
	if err := rlimit.RemoveMemlock(); err != nil {
		return Probes{}, nil, fmt.Errorf("error removing memory lock - %v", err)
	}

	objs := bpfObjects{}
	err := loadBpfObjects(&objs, &ebpf.CollectionOptions{})
	if err != nil {
		var ve *ebpf.VerifierError
		if errors.As(err, &ve) {
			fmt.Printf("Verifier Error: %+v\n", ve)
		}
		return Probes{}, nil, fmt.Errorf("error loading BPF objects from go-side. %v", err)
	}
	log.Printf("BPF objects loaded")

	// attach a kprobe and tracepoint
	kp, err := link.Kprobe("tcp_data_queue", objs.bpfPrograms.HandleTcpDataQueue, nil)
	if err != nil {
		return Probes{}, nil, fmt.Errorf("error attaching kprobe: %v", err)
	}
	log.Printf("Kprobe attached successfully")

	tp, err := link.Tracepoint("sock", "inet_sock_set_state", objs.bpfPrograms.HandleSockSetState, nil)
	if err != nil {
		return Probes{}, nil, fmt.Errorf("error attaching tracepoint: %v", err)
	}
	log.Printf("Tracepoint attached successfully")

	// We are done with loading kprobes - clear the btf cache
	btf.FlushKernelSpec()

	return Probes{
		Kprobe:     kp,
		Tracepoint: tp,
		BpfObjs:    objs,
	}, objs.Connections, nil
}

解析工作负载 trace 链路拓扑图:

// a single polling from the eBPF maps
// iterating the traces from the kernel-space, summing each network link
func (tracer *LinksTracer) TracesPollingIteration(pastLinks map[NetworkLink]uint64) (map[NetworkLink]uint64, map[NetworkLink]uint64) {
	// outline of an iteration -
	// filter unwanted connections, sum all connections as links, add past links, and return the new map
	pollsMade.Inc()
	unroledCounter := 0
	loopbackCounter := 0

	currentLinks := make(map[NetworkLink]uint64)
	var connectionsToDelete []ConnectionIdentifier

	var conn ConnectionIdentifier
	var throughput ConnectionThroughputStats

	entries := tracer.connections.Iterate()
	// iterate the map from the eBPF program
	itemsCounter := 0
	for entries.Next(&conn, &throughput) {
		itemsCounter += 1
		// filter unnecessary connection

		if throughput.IsActive == 0 {
			connectionsToDelete = append(connectionsToDelete, conn)
		}

		// skip loopback connections
		if conn.Tuple.SrcIp == conn.Tuple.DstIp && isAddressLoopback(conn.Tuple.DstIp) {
			loopbackCounter++
			continue
		}

		// filter unroled connections (probably indicates a bug)
		link, err := tracer.reduceConnectionToLink(conn)
		if conn.Role == UnknownConnectionRole || err != nil {
			unroledCounter++
			continue
		}
		currentLinks[link] += throughput.BytesSent
	}

	mapSize.Set(float64(itemsCounter))
	unRoledConnections.Set(float64(unroledCounter))
	filteredLoopbackConnections.Set(float64(loopbackCounter))

	// add past links
	for pastLink, pastThroughput := range pastLinks {
		currentLinks[pastLink] += pastThroughput
	}

	// delete connections marked to delete
	for _, conn := range connectionsToDelete {
		tracer.deleteAndStoreConnection(&conn, pastLinks)
	}
	return pastLinks, currentLinks
}

参考

1、Navigate your way to production bliss with Caretta

2、https://github.com/groundcover-com/caretta

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/671762.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【瑞萨RA_FSP】AGT——低功耗定时器

文章目录 一、AGT简介二、AGT的框图分析1. 16位计数器2. 16位重装载寄存器3. 计数时钟源4. 比较匹配功能5. 比较匹配输出引脚6. 输出引脚7. 下溢事件信号/测量完成事件信号输出 三、AGT工作模式详解四、实验&#xff1a;比较匹配功能——PWM输出1. 硬件设计2. 文件结构3. FSP配…

基础篇:新手使用vs code新建go项目(从0开始到运行)

学习新语言&#xff0c;搭建新环境。在网上找了一些教程&#xff0c;感觉还是写一个比较详细的方便以后自己使用。其实vs code没有新建项目这个功能&#xff0c;具体怎么运行go语言的项目请看下文。 一、下载GO安装包 1.点击go安装包下载链接下载相应的版本&#xff08;本次下…

【计算机网络自顶向下】简答题习题总结(三)

文章目录 第三章 传输层UDP用户数据报协议可靠数据传输原理面向连接传输TCP流量控制可靠数据传输机制 题目 第三章 传输层 传输层服务&#xff1a;在两个不同的主机的运行应用程序之间提供逻辑通信 在接收主机多路分解 将接收到的数据段传递给正确的套接字【多路分解】 在发送…

线程与轻进程(OS)

目录 1、进程的引入 2、线程的概念 3、线程的结构 3、线程控制块 5、线程的实现 &#xff08;1&#xff09;用户级别线程 &#xff08;2&#xff09;核心级别线程 &#xff08;3&#xff09;混合线程 6、线程的应用 1、进程的引入 进程切换 上下文涉及内容多&#xf…

软件测试面试,从简历到面试常问,不学几招怎么跳槽?

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 软件测试面试环节…

关于Java多线程不安全的问题简析

在了解多线程不安全的问题之前 让我们先来看如下代码 public class demo18 {public static int count 0;public static void main(String[] args) throws InterruptedException {Thread t1 new Thread(()->{for (int i 0; i < 10000; i) {count;}});Thread t2 new …

软件测试报告需要做哪些测试内容?软件测试外包服务公司靠谱吗?

在软件开发领域中&#xff0c;测试是最为重要的环节之一&#xff0c;它在确保软件质量方面有着至关重要的作用。软件测试是一种检验软件代码是否符合设计和用户期望的过程。软件测试的主要目的是发现缺陷并确保软件在实际使用中的可靠性&#xff0c;安全性&#xff0c;以及稳定…

Linux中centos修改系统时间并写到硬件,Linux中centos设置定时自动同步网络时间

文章目录 前言一、centos修改系统时间并写到硬件1.1查看当前的系统时间1.2修改系统时间1.3查看硬件时间1.4同步系统时间和硬件时间1.5本地时间写入硬件时间 二、centos设置定时自动同步网络时间2.1安装ntpdate工具2.2CentOS安装/操纵crontab2.3启动crontab并查看状态2.4写一个c…

Ubuntu系统安装Mysql服务并设置远程连接-Navicat连接Mysql-物联网系统

目录 一、前言 二、Mysql的安装 三、Mysql服务管理 四、配置Mysql远程连接 五、修改登录限制 六、修改Root密码 七、Navicat连接Mysql 一、前言 在我们购买服务器后&#xff0c;常需要在服务器上部署数据库以存储我们所需要的数据&#xff0c;因此我们本文将在Ubuntu系统…

LeetCode - #81 搜索旋转排序数组 II

文章目录 前言1. 描述2. 示例3. 答案关于我们 前言 我们社区陆续会将顾毅&#xff08;Netflix 增长黑客&#xff0c;《iOS 面试之道》作者&#xff0c;ACE 职业健身教练。&#xff09;的 Swift 算法题题解整理为文字版以方便大家学习与阅读。 LeetCode 算法到目前我们已经更新…

解决阿里云服务器被植入挖矿脚本过程

文章目录 前言一、服务器为什么会被告警挖矿&#xff1f;二、怎么解决&#xff1a;1.top 命令查看进程cpu 占用情况&#xff1a;2.通过pid进程号&#xff0c;查找改程序所在的目录&#xff1a;3. 强制删除脚本文件&#xff1a;4. 强制杀死进程&#xff1a;5. 检查是否有脚本的定…

three.js物体纹理及其常用属性介绍

一、Three中的纹理和材质介绍 THREE中的纹理和材质是用来渲染3D场景中的物体表面的。纹理贴图定义物体表面的颜色和外观&#xff0c;而材质则定义物体表面如何反射光线。 纹理可以使用多种类型的图像文件&#xff0c;包括JPEG、PNG、GIF等。纹理可以是简单的颜色、图案或者是复…

史上最卷618背后:国产手机厂商突围的“新武器”

智能手机&#xff0c;究竟还是不是个好生意? 这个问题在近些年被市场反复追问&#xff0c;在最近被称为“史上最卷”的618期间&#xff0c;更是被增添了悲观的色彩。IDC中国研究经理郭天翔表示&#xff0c;本次618智能终端市场是低于预期的&#xff0c;同时也低于去年同期。除…

Qt6.2教程——1.Qt安装及编写登录界面

本文旨在帮助读者理解如何使用ChatGPT来辅助安装和学习Qt 6.2。我们将从Qt 6.2的基本概念开始&#xff0c;然后深入了解其安装过程&#xff0c;并探讨如何使用ChatGPT作为一个强大的辅助工具。对于那些寻求在学习和使用Qt 6.2中找到有效支持的人来说&#xff0c;这篇文章将提供…

FBM207C RH917GY将相关调节系统打到手动状态,必要时到现场进行调节

​ FBM207C RH917GY将相关调节系统打到手动状态&#xff0c;必要时到现场进行调节 FBM207C RH917GY将相关调节系统打到手动状态&#xff0c;必要时到现场进行调节 随着自动化水平的提高&#xff0c;dcs控制系统(集散控制系统)逐渐代替了常规仪表&#xff0c;其优越性已被广大操…

Vue 常用属性

数据属性 组件的 data 选项是一个函数&#xff08;data里面是有return的&#xff09;。Vue 会在创建新组件实例的过程中调用此函数&#xff08;将里面定义的变量都放到实例里面去&#xff0c;你就可以使用this点出来&#xff0c;包括HTML里面就能够使用这些变量的&#xff09;。…

LabVIEW开发航空航天器风洞的数据采集系统

LabVIEW开发航空航天器风洞的数据采集系统 空气动力及其系数的评估是航空航天器设计中的一项基本任务&#xff0c;对于考虑制造高效飞行器非常重要。航空航天器的效率是根据其稳定性、最小阻力和更高的机动性来定义的。在风洞中使用航空航天飞行器模型进行测试&#xff0c;而不…

【Leetcode】11.盛最多水的容器

一、题目 1、题目描述 给定一个长度为 n 的整数数组 height 。有 n 条垂线,第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线,使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明:你不能倾斜容器。 示例1: 输…

Nautilus Chain:模块化Layer3的先行者

“模块化特性的 Nautilus Chain 正在成为 Layer3 的早期定义者之一&#xff0c;并有望进一步推动区块链更广泛的应用与实践 ” 自以太坊创始人 Vitalik Buterin 在去年提出 Layer3 的概念后&#xff0c;行业始终对“Layer3”进行讨论&#xff0c;并期望推动该概念&#xff0c;从…

mysql版本5.5.*升级为5.7.*,遇到的问题和解决方法都来看看吧,最终升级成功~

背景&#xff1a;由于项目比较老&#xff0c;用的数据库版本也是相当低&#xff0c;现在业务需求需要做数据同步&#xff0c;使用FlinkCDC的时候报数据库版本低&#xff0c;查询FlinkCDC要求的最低版本后果断升级mysql~ FlinkCDC对mysql最低版要求如下图&#xff1a; &#x…