canal简介
canal是一款优秀的订阅MySQL binlog的中间件,在MySQL异构数据到其它存储平台领域非常的实用好用。而且在数据表的迁移中也可以用canal订阅,然后将更新实时同步到新表。
原理
canal部署后伪装为一个MySQL slave节点向DB发起同步binlog请求,DB将binlog发送给slave,canal解析转成一系列事件交给用户处理。insert、update、delete等DML事件,并且可以获取到SQL执行前后的原数据信息。
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
canal部署后是一个server,此时我们新建工程连接server消费canal给我们推来的event就是client。我们通过canal提供的API轻松应用binlog转化的event,并提取数据同步到其他存储。
HA机制,canal为了避免单点当然可以部署集群实现HA。但是为了避免多个canal和DB建立链接同时解析binlog并推给下游消费,同时只能有1个canal运行,依靠的是CA的zookeeper实现。
大致步骤:
- canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
- 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
- canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.
Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制。
在华润中的实际应用
在我们需求中领导需要查看多维度复杂的报表,MySQL就无法满足近实时的且多维度的复杂查询,我们引入了Elasticsearch来满足需求。那么MySQL数据异构到ES,就是使用的canal实现。
问题表象和排查
某一天下午,正在高高兴兴编码,突然被一声粗糙的喊声拉出了思绪。“这怎么发送MQ超时了?链接不上MQ啦”一个同事日常的没过脑的喊叫。定睛一看原来是canal-client在消费binlog时发送给MQ的消息报错了。于是先入为主的去排查了mq,发现namesrv、broker都没有宕机或重启。查了canal-client也没有宕机重启。于是怀疑是网络通信问题,查了2个小时没有收获...
期间为了尽快恢复数据同步,重启了canal-client,就恢复了mq数据同步。这里也丢失了第一现场。通过mq发送消息监控,是大约在2.4号1.54分就没有消息发送了,应该就是这个时间有故障。后来在日志平台查询1.54~2点的canal-client日志,发现了问题根因。
close the connection to remote address[] result: true
在1.54.37秒canal消费binlog线程发生了一个未知错误,导致线程死亡。在日志上可看到此时间点后没有消费binlog日志,在mq发送监控上也符合1.54分左右没有发送消息流量。这个错误信息在代码中查找,找到出处。是一个消费canal-server发送过来的event事件的消费线程,也是驱动整个消费程序的线程。
这段代码是这样的,start函数是canal-client启动后和server连接上调用的函数。会启动一个消费线程一直执行process函数,在函数内做具体的event事件分发并消费。线程注册了未捕获异常handler以便在线程发生异常时能回调通知用户,做一些补偿或记录。这个handler中打印的就是日志中出现的异常语句,所以可以肯定消费线程出现了异常导致退出。
至于出现了什么异常?似乎是io错误,读取到流的结尾。检查了canal-server是部署在201机器上,且pod在1天前部署过。看日志是2.4 1:59分启动的,和canal-client的消费线程异常时间比较吻合。canal-server在1.54分左右因201机器出现内存爆满,导致k8s对201机器上的pod进行驱逐,然后1.59分重启。canal-client没有重新连接上canal-server,因为消费线程在遇到长连接断开后,进入handler中没有做特殊处理,就打了日志就退出了。
问题到这里已经查明原因了,但是为了更严谨,还是选择在本地用代码模拟复现这个场景。看看线程是不是进入到未捕获异常handler就会退出。
问题复现
运行下面的代码,main线程启动了一个t线程,t线程会一直执行HomeWork任务,直到i>100抛出异常。观察如果t线程抛出异常后还能继续打印日志,代表t线程即使在run中抛出异常也能继续运行,反之证明t线程已经退出消亡了。其实还有其他手段可证明,例如用jstack、visualVM等监控工具观察t线程。但这个实验是最快最低成本的。
/** * @author Joseph * @since 2024/2/4 */
public class ThreadExceptionTest {
protected static Thread.UncaughtExceptionHandler handler = (t, e) -> System.out.println("error="+e.getMessage());
public static void main(String[] args) {
Thread t = new Thread(new HomeWork());
t.setUncaughtExceptionHandler(handler);
t.start();
while (true) {
System.out.println("MainThread running");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
static class HomeWork implements Runnable {
@Override
public void run() {
int i = 0;
while (true) {
System.out.println(i);
if (i++ >= 100) {
throw new RuntimeException("thread dead");
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
总结
在生产上遇到了canal同步es数据突然中断的问题,初步查看是发现MQ没有继续发送消息同步数据了。随着排查深入发现是K8S的pod节点内存不够导致的驱逐pod,canal-server被重启,canal-client和server断开连接,消费线程遇到异常没有进行合适的处理,打了错误日志退出消亡。
随着排查问题,学习了canal能够解析MySQL binlog的原理,是通过MySQL提供的协议伪装成slave节点向master拉取binlog并自己解析转化为一系列insert、update、delete等DML事件,再提供给用户消费。还学习了canal通过zookeeper的强CA机制实现多节点的HA机制。