Canal1.1.6安装部署

news2025/1/18 6:49:26

什么是Canal

阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

MySQL 的 Binlog

什么是 Binlog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
其一:MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves
来达到 Master-Slave 数据一致的目的。
其二:自然就是数据恢复了,通过使用 MySQL Binlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。

Binlog 的分类

MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配置 binlog_format= statement|mixed|row。三种格式的区别:

1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点:节省空间。
缺点:有可能造成数据不一致。

2)row:行级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大空间

3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。

综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。

Canal 的工作原理

MySQL 主从复制过程

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

Canal 的工作原理

很简单,就是把自己伪装成 Slave,假装从 Master 复制数据。

使用场景

1)原始场景: 阿里 Otter 中间件的一部分
Otter 是阿里用于进行异地数据库之间的同步框架,Canal 是其中一部分。
在这里插入图片描述
2)常见场景 1:更新缓存
在这里插入图片描述
3)常见场景 2:抓取业务表的新增变化数据,用于制作实时统计(我们就是这种场景)

Canal 的下载和安装

https://github.com/alibaba/canal/releases
在这里插入图片描述
这里我们下载1.1.6版本,上传至/opt/software
先创建canal文件夹,然后解压到该文件夹

cd /opt/module
mkdir canal
cd /opt/software
tar -zxvf canal.deployer-1.1.6.tar.gz -C /opt/module/canal/

canal.properties

vim /opt/module/canal/conf/canal.properties

说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111

多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3
在这里插入图片描述
当服务器多网卡的时候,要配置指定网络,否则无法访问到canal

这里我们只监视一个mysql,所以该配置文件可以不动

instance.properties

vim /opt/module/canal/conf/example/instance.properties

在这里插入图片描述
更改以上三点就可以了

启动

```java
cd /opt/module/canal/bin/
./startup.sh

确保mysql开启binlog

show variables like ‘%log_bin%’;

在这里插入图片描述

java代码连接

创建springboot项目,pom文件

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.6</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.7</version>
        </dependency>
package com.example.cannal_demo.util;


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.InetSocketAddress;

@Component
@Slf4j
public class CanalClient implements ApplicationRunner {

    @Value("${canal.ip}")
    private String ip;

    @Value("${canal.port}")
    private Integer port;

    @Value("${canal.username}")
    private String username;

    @Value("${canal.password}")
    private String password;

    @Value("${canal.destination}")
    private String destination;

    @Value("${canal.batch-size}")
    private Integer batchSize;

    @Value("${canal.subscribe}")
    private String subscribe;

    @Resource
    MessageHandler messageHandler;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("----->>>>>>>>启动canal");
        startCanal();
    }

    private void startCanal() {

        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, port), destination, "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(subscribe);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始
            connector.rollback();
            while (true) {
                //获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //现成休眠1s
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    messageHandler.handler(message);
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

        } finally {
            connector.disconnect();
        }
    }

}

package com.example.cannal_demo.util;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;

@Service
@Slf4j
public class MessageHandler {

    @Resource
    private AbstractEntryHandler abstractEntryHandler;

    public void handler(Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
                log.info("----->>>>>>>开始处理CanalEntry");
                abstractEntryHandler.handler(entry);
            }
        }
    }
}
package com.example.cannal_demo.util;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * @Description: 获取到数据后进行相应的处理
 * @Author: yyl
 * @Date: 2022/7/13
 */
@Service
@Slf4j
public class AbstractEntryHandler {

    public final void handler(CanalEntry.Entry entry) {

        CanalEntry.RowChange rowChage = null;
        try {
            rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
        }

        CanalEntry.EventType eventType = rowChage.getEventType();

        boolean isDdl = rowChage.getIsDdl();

        log.info("----------库名:{}--------表名:{}--------", entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        String operation = null;
        Map<String, String> map = new HashMap<>();
        switch (eventType) {
            case INSERT:
                rowChage.getRowDatasList().forEach(rowData -> {
                    List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                    for (CanalEntry.Column column : columns) {
                        //byte[] bytes = column.getValueBytes().toByteArray();
                        map.put(camelName(column.getName()), column.getValue());
                    }
                });
                operation = "添加";
                break;
            case UPDATE:
                rowChage.getRowDatasList().forEach(rowData -> {
                    List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                    for (CanalEntry.Column column : columns) {
                        map.put(camelName(column.getName()), column.getValue());
                    }
                    Map<String, String> map1 = new HashMap<>();
                    List<CanalEntry.Column> columns1 = rowData.getBeforeColumnsList();
                    for (CanalEntry.Column column : columns1) {
                        map1.put(camelName(column.getName()), column.getValue());
                    }
                    log.info("---------更新之前map={}----------", map1);
                });
                operation = "更新";
                break;
            case DELETE:
                rowChage.getRowDatasList().forEach(rowData -> {
                    List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                    for (CanalEntry.Column column : columns) {
                        map.put(camelName(column.getName()), column.getValue());
                    }
                });
                operation = "删除";
                break;
            default:
                break;
        }
        log.info("---------操作:{},数据={}----------", operation, map);
    }

    /**
     * 将下划线大写方式命名的字符串转换为驼峰式。如果转换前的下划线大写方式命名的字符串为空,则返回空字符串。</br>
     * 例如:HELLO_WORLD->HelloWorld
     *
     * @param name 转换前的下划线大写方式命名的字符串
     * @return 转换后的驼峰式命名的字符串
     */
    public static String camelName(String name) {
        StringBuilder result = new StringBuilder();
        // 快速检查
        if (name == null || name.isEmpty()) {
            // 没必要转换
            return "";
        } else if (!name.contains("_")) {
            // 不含下划线,仅将首字母小写
            return name.substring(0, 1).toLowerCase() + name.substring(1);
        }
        // 用下划线将原始字符串分割
        String camels[] = name.split("_");
        for (String camel : camels) {
            // 跳过原始字符串中开头、结尾的下换线或双重下划线
            if (camel.isEmpty()) {
                continue;
            }
            // 处理真正的驼峰片段
            if (result.length() == 0) {
                // 第一个驼峰片段,全部字母都小写
                result.append(camel.toLowerCase());
            } else {
                // 其他的驼峰片段,首字母大写
                result.append(camel.substring(0, 1).toUpperCase());
                result.append(camel.substring(1).toLowerCase());
            }
        }
        return result.toString();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/23749.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

反函数求导:自然对数 ln是怎么得到的;为什么自然对数的导数是 1/ x;arcsin 和 arccos 的导数求算

参考视频&#xff1a;MIT微积分 如何得到的自然对数 lnlnln 首先我们知道以 eee 为底的指数函数 exe^xex 其次&#xff0c;我们引入反函数&#xff08;逆函数&#xff09;的概念 f−1(y)f^{-1}(y)f−1(y) 对于任意的 xxx 如果 f(x)yf(x)yf(x)y 那么 xf−1(x)xf^{-1}(x)xf−1(…

Redis的优惠券秒杀问题(七)在集群模式下的问题

Redis的优惠券秒杀问题&#xff08;七&#xff09;在集群模式下的问题 问题描述 伪集群模式搭建 &#xff08;1&#xff09;IDEA启动镜像 &#xff08;2&#xff09;修改nginx配置 &#xff08;3&#xff09;验证nginx是否启动成功 BUG复现 &#xff08;1&#xff0…

零入门容器云网络-4:基于DNAT技术使得外网可以访问本宿主机上veth-pair链接的内部网络

已发表的技术专栏&#xff08;订阅即可观看所有专栏&#xff09; 0  grpc-go、protobuf、multus-cni 技术专栏 总入口 1  grpc-go 源码剖析与实战  文章目录 2  Protobuf介绍与实战 图文专栏  文章目录 3  multus-cni   文章目录(k8s多网络实现方案) 4  gr…

数据结构:堆

文章目录一.堆的概念和性质二.堆的结构三.堆的实现3.1结构体声明3.2堆初始化3.3释放堆3.4打印堆3.5插入3.6删除3.7取堆顶元素3.8堆的元素个数3.9判空3.10补充四.建堆4.1向上调整建堆4.2向下调整建堆五.排序5.1升序5.2降序六.TOP-K问题一.堆的概念和性质 堆的概念&#xff1a; …

数据存储方式——KVELL:快速持续键值存储的设计与实现

文章目录前言一、背景1.当前流行的两种存储范式2.SSD性能的发展IOPS延迟和带宽吞吐量降低I / O突发3.NVMe ssd上当前KVs的问题3.1 CPU是瓶颈CPU是LSM KVs的瓶颈CPU是B树KVs的瓶颈3.2 LSM和B树KVs的性能波动二、KVELL1.KVs设计原则1.1 不共享1.2 不要在磁盘上排序&#xff0c;而…

Spring——IOC容器部分核心接口

Spring——IOC容器部分核心接口一、简介二、IOC容器核心接口1.BeanDefinition2.BeanDefinitionReader3.BeanDefinitionRegistry4.BeanFactory5.ApplicationContext6.BeanPostProcessor7.BeanFactoryPostProcessor8.BeanDefinitionRegistryPostProcessor9.总结一、简介 以下接口…

vim工具的使用

目录 vim的基本模式 vim三种基本模式(命令模式、底行模式、输入模式) 命令模式 vim正常(命令行)模式命令集 插入模式 底行模式 保存&退出 分屏 替换 执行shell指令 vim底行模式命令集 vim配置 配置文件的位置 配置文件的原理 如何配置 解决sudo无法使用的情…

[附源码]计算机毕业设计JAVA基于协同过滤算法的网上招聘系统

[附源码]计算机毕业设计JAVA基于协同过滤算法的网上招聘系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a…

【教学类-16-01】20221121《数字卡片9*2》(中班)

作品展示&#xff1a; ​ 打印墨水不够了​ 铅笔描边 ​ 剪开 ​ 每个人是A4 一半的大小 ​ 背景需求&#xff1a; 在数字像素图的基础上&#xff0c;我决定制作1-9的数字卡片&#xff0c;空心数字&#xff08;华文彩云&#xff09;涂色&#xff0c;卡片左上角写学号。——…

go使用grpc实现go与go,go与C#相互调用

protoc下载 protoc是protobuf的编译工具&#xff0c;能根据.proto文件生成为各种语言的源文件。 原始的protoc集成了如下语言的转换&#xff1a; cc#javaobjectcphppythonruby 但是没有集成go的转换工具。go的转换工具是在protoc的基础上使用插件的方式运行。 protoc 的下载地…

linux NC命令的本质

NC是一个可以模拟tcp&#xff0c;udp&#xff0c;server,client 的协议&#xff0c; 1-它可以实现两个主机的聊天 server: nc -lp 1234 client : nc 192.168.1.10 1234 以上两个命令就可以实现实时数据传输了&#xff0c;是不是很有意思&#xff0c;但是这个是怎么实现的呢&am…

软考信息安全工程师必会--3000+字文章浅析DES加密算法

目录 前言 什么是DES加密算法 整体流程 IP置换 子密钥K 压缩置换1 循环左移 拓展置换2 拓展置换E S盒代替 S1盒 S2盒 S3盒 S4盒 S5盒 S6盒 S7盒 S8盒 P盒置换 末置换 前言 &#x1f340;作者简介&#xff1a;被吉师散养、喜欢前端、学过后端、练过CTF、玩过DOS…

flink1.10中三种数据处理方式的连接器说明

第一种 Streaming&#xff08;DataStream API&#xff09; 流式处理的所有的连接器如上图&#xff0c;常用的是kafka、Elasticsearch、Hadoop FileSystem Kafka连接器 依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connec…

2022-11-21 mysql列存储引擎-架构实现缺陷梳理-P1

1. 前言 发现和指出问题为了&#xff1a;更好的解决问题和避免问题的再次发生 项目在演进&#xff0c;代码不停地在堆砌。如果代码的质量一直不被重视&#xff0c;代码总是会往越来越混乱的方向演进。当混乱到一定程度之后&#xff0c;量变引起质变&#xff0c;项目的维护成本…

二叉树和堆

二叉树和堆什么是树树的一些专业术语树的表示二叉树的概念什么是二叉树特殊的二叉树二叉树的性质堆的概念堆的表示方式堆的实现堆的初始化及销毁堆的插入堆的删除堆的判空与获取堆顶元素堆的主要应用堆排序利用堆数据结构建堆利用向上调整算法来建堆利用向下调整算法建堆TopK问…

计算机毕业设计——校园二手市场

一.项目介绍 系统包含 普通用户 和 管理员 两种角色 普通用户 浏览其他用户发布的二手物品&#xff0c;修改个人信息、查看订单&#xff08;买的、卖的&#xff09;、 发布闲置、查看我的闲置、查看我的关注、跳转后台、更改用户名等功能 管理员用户登录 操作用户管理、商品管理…

中学物理教学参考杂志社中学物理教学参考编辑部2022年第21期目录

前沿导航_课改在线《中学物理教学参考》投稿&#xff1a;cn7kantougao163.com 问题导向式学习 提升学生思维品质——以“自由落体运动”教学为例 汪欣; 1-2 物理实验探究式复习模式研究——以“机械能守恒定律”为例 王栋; 3-4 初中物理作业设计类型及其实施策略 马…

阿里8年测试经验,手工测试转变为自动化测试,送给正在迷茫的你

前言 随着软件测试技术的发展&#xff0c;人们已经从最初的纯粹的手工测试转变为手工与自动化测试技术相结合的测试方法。近年来&#xff0c;自动化测试越来越受到人们的重视&#xff0c;对于自动化测试的研究也越来越多。 项目版本功能日趋增加&#xff0c;系统模块越来越多…

2009-2013、2018-2020计算机网络考研408真题

文章目录2009年选择综合应用2010年选择综合应用2011年选择综合应用2012年选择综合应用2009年 选择 在OSI参考模型中&#xff0c;自下而上第一个提供端到端服务的层次是……。 A.数据链路层 B.传输层 C.会话层 D.应用层 考查OSI模型中传输层的功能。 传输层提供应用进程间的逻辑…

3-3、python中内置数据类型(集合和字典)

文章目录集合集合的创建集合的特性集合的常用操作增加删除查看练习-对集合的排序frozenset 不可变的集合字典字典的创建字典的特性字典的常用方法查看增加和修改删除遍历字典 &#xff08;for&#xff09;defaultdict默认字典&#xff08;给字典设置默认值&#xff09;内置数据…