一文学会Canal怎么用

news2025/1/8 5:04:21

文章目录

  • 一.概念
    • 1.什么是Canal
    • 2.Canal的基本原理
  • 二.Mysql配置
    • 1. 安装
    • 2. 开启mysql的binlog
    • 3.mysql创建cannl用户并授权
  • 三.安装配置ES,kibana
  • 四.安装canal-server
  • 五.安装canal-admin
  • 六.安装canal-adapter
  • 七.通过canal和RabbitMQ将mysql数据同步ES

一.概念

1.什么是Canal

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。

2.Canal的基本原理

想了解canal的工作原理,首先你得知道什么是主从复制?

  • Master主库将改变记录,写到二进制日志(binary log)中
  • Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);
  • Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

binlog的格式有三种:STATEMENT,MIXED,ROW
在这里插入图片描述

canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:
在这里插入图片描述

在这里插入图片描述

二.Mysql配置

1. 安装

参考教程:添加链接描述

2. 开启mysql的binlog

[client]
default_character_set=utf8
[mysqld]
collation_server = utf8_general_ci
character_set_server = utf8

##局域网内唯一id
server_id=111
##指定不需要同步的数据库名称
binlog-ignore-db=mysql
##开启二进制日志功能
log-bin=mysql-bin
##设置二进制日志使用内存大小
binlog_cache_size=1M
## 二进制日志格式
binlog-format=ROW
## 二进制日志过期清理时间 默认值为0 表示不自动清理
expire_logs_days=7
## 跳过主从复制中遇到的错误
slave_skip_errors=1062

配置完成后重启mysql,并查询是否配置生效:ON就是开启
在这里插入图片描述

3.mysql创建cannl用户并授权

CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

在这里插入图片描述ue

三.安装配置ES,kibana

四.安装canal-server

可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。

  • 下载地址
    https://github.com/alibaba/canal/releases

在这里插入图片描述

  • 解压
    tar -zxvf canal.deployer-1.1.4.tar.gz

  • canal.properties的common属性前四个配置项

canal.id= 1             #canal的编号,在集群环境下,不同canal的id不同,注意它和mysql的server_id不同。
canal.ip=               # ip这里不指定,默认为本机
canal.port= 11111       # 端口号,是给tcp模式(netty)时候用的,如果用了kafka或者rocketmq,就不会去起这个端口了
canal.zkServers=         # zk用于canal cluster
canal.serverMode = tcp   # 用于指定什么模式拉取数据

完整的canal.properties文件
在这里插入图片描述

在这里插入图片描述

  • 查看监听的数据库状态
    在这里插入图片描述

  • 修改conf/example/instance.properties


canal.instance.gtidon=false

# position info
#数据库地址
##slaveId 不能与 my.cnf 中的 server-id 项重复
## canal.instance.mysql.slaveId = 1234
canal.instance.master.address=192.168.111.5:3306
#binlog日志名称
canal.instance.master.journal.name=mysql-bin.000005
#binlog偏移量
canal.instance.master.position=154
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
#mysql授权用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =test_canal
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
#canal.mq.topic=example
# 多主题 canal会将对应表的数据推送到“库名—表名”的主题队列里面
canal.mq.dynanicTopic=.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=mytest.person:id,mytest.role:id

在这里插入图片描述
在这里插入图片描述

  • 启动
sh bin/startup.sh 

在这里插入图片描述

  • 测试

创建数据库表
在这里插入图片描述
创建对应的队列并绑定到相应的交换机上
在这里插入图片描述

消费队列里面同步数据
canal发送到mq的数据为byte[]类型

    @RabbitListener(queues = {"test_canal_stu"})
    public void stu(byte[] input) throws IOException {

        JSONObject o = JSON.parseObject(input, JSONObject.class);
        String action = o.getString("type");
        System.out.println("=============stu-action=================");
        System.out.println(action);
        
        List<testStu> data = JSON.parseArray(o.getString("data"), testStu.class);
        System.out.println("============stu-data==================");
        System.out.println(data);
    }

在这里插入图片描述

五.安装canal-admin

  • 解压canal.admin-1.1.6.tar.gz

  • 初始化脚本: conf/canal_manager.sql
    在这里插入图片描述

  • 修改conf/application.yml:
    在这里插入图片描述

  • 启动
    ./bin/startup.sh

cat logs/admin.log
在这里插入图片描述

  • 修改canal-server配置文件
vi conf/canal_local.properties

在这里插入图片描述

  • 启动canal-server服务
指定启动配置为local,或者将canal_local.properties替换掉canal.properties
./bin/startup.sh local
  • canal-server启动成功后,刷新admin的server管理列表,canal-server会自动注册到admin
    在这里插入图片描述

  • 新建server
    在这里插入图片描述

  • 载入server模板
    在这里插入图片描述

  • 添加instance

配置
在这里插入图片描述
在server配置里面添加
在这里插入图片描述

canal-admin Handler dispatch failed; nested exception is java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer;

无法启动成功
在这里插入图片描述

jdk版本问题,官方写的是1.5以上,使用的jdk1.8不行,后面采用了11测试通过,其它版本大家可以自行测试

  • 密码需要加密
select password('123456');
  • Caused by: com.alibaba.otter.canal.common.CanalException: requestGet for canal config error: auth :admin is failed
    赋予数据库权限
grant select, insert, update, delete on canal_manager.* to canal@'%'

六.安装canal-adapter

相当于canal的客户端,会从canal-server中获取数据,然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。

在这里插入图片描述

  • 解压: tar -zxvf canal.adapter-1.1.2.tar.gz

  • 修改canal-adapter/conf/application.yml


server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
 
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #  之前起的 canal-server 地址  url
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    # kafka.bootstrap.servers: 127.0.0.1:9092
    # kafka.enable.auto.commit: false
    # kafka.auto.commit.interval.ms: 1000
    # kafka.auto.offset.reset: latest
    # kafka.request.timeout.ms: 40000
    # kafka.session.timeout.ms: 30000
    # kafka.isolation.level: read_committed
    # kafka.max.poll.records: 1000
    # rocketMQ consumer
    # rocketmq.namespace:
    # rocketmq.namesrv.addr: 127.0.0.1:9876
    # rocketmq.batch.size: 1000
    # rocketmq.enable.message.trace: false
    # rocketmq.customized.trace.topic:
    # rocketmq.access.channel:
    # rocketmq.subscribe.filter:
    # rabbitMQ consumer
    # rabbitmq.host:
    # rabbitmq.virtual.host:
    # rabbitmq.username:
    # rabbitmq.password:
    # rabbitmq.resource.ownerId:
 
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # 你的数据库地址 最好填写公网地址
      username: root   # 数据库账号
      password: admin  # 数据库密码
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7  # 你的es配置
        hosts: 127.0.0.1:9200 # 你的es配置  最好是公网地址
        properties:
         mode: rest # 你的es配置
         cluster.name: test-es # 你的es配置  之前第一步在elasticsearch中配置的 cluster.name
  • 修改canal-adapter/conf/es7/test.yml
dataSourceKey: defaultDS
destination: .*\\..*
groupId: g1
esMapping:
  _index: test_adapter
  _id: _id
#  upsert: true
#  pk: id
  sql: "SELECT id as _id,user_id as user_id,name as name,unit as unit FROM test_adapter"
#  objFields:
#    _labels: array:;
  etlCondition: "where a.c_time>={}"
  commitBatch: 3000
  • 添加索引
PUT /test_adapter/
{
  "mappings": {
    "properties": {
      "id": {
          "type": "keyword"
        },
      "user_id": {
        "type": "integer"
      },
      "name": {
        "type": "text"
      },
      "unit": {
        "type": "text"
      },
      "record_date":{
        "type": "date"
      }
    }
  }
}

七.通过canal和RabbitMQ将mysql数据同步ES

  • 1.修改canal-server 的conf/canal.properties文件
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 2.修改conf/example/instance.properties文件

在这里插入图片描述
在这里插入图片描述

  • 3创建数据库表,交换机和主题队列
CREATE TABLE `stu` (
  `id` int(11) NOT NULL,
  `stu_no` int(11) DEFAULT NULL,
  `createdAt` date DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

在这里插入图片描述
只需要在mq中创建对应的队列并以’数据库名_表名’为topic,canal就会监听mysql的binlog日志并将其发送到对应的队列

@Data
public class testStu implements Serializable {
    private String id;
    private String stuNo;
    private Date createdAt;
}
  • 3.测试同步代码
    @RabbitListener(queues = {"test_canal_stu"})
    public void stu(byte[] input) throws IOException {

        JSONObject o = JSON.parseObject(input, JSONObject.class);
        String action = o.getString("type");
        System.out.println("=============stu-action=================");
        System.out.println(action);

        List<testStu> data = JSON.parseArray(o.getString("data"), testStu.class);
        System.out.println("============stu-data==================");
        System.out.println(data);
    }

在这里插入图片描述

  • 4.同步ES测试代码
    @RabbitListener(queues = {"test_canal_stu"})
    public void stu(byte[] input) throws IOException {

        JSONObject o = JSON.parseObject(input, JSONObject.class);
        String action = o.getString("type");
        List<testStu> data = JSON.parseArray(o.getString("data"), testStu.class);

        try {
            esTest(action,data);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private void esTest(String action,List<testStu> stuList) throws Exception {

        if ("DELETE".equals(action)) {
            //数据库表进行硬删除
            batchDeleteFromEs(stuList);
        } else {
            batchSaveToEs(stuList);
        }
    }


    //根据时间进行分片 并批量删除
    public void batchDeleteFromEs(List<testStu> data) throws Exception {
        Map<String, List<String>> map = data.stream().collect(
                Collectors.groupingBy(dto -> getIndex(dto.getCreatedAt()),
                        Collectors.mapping(testStu::getId, Collectors.toList()))
        );
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            String index = entry.getKey();
            List<String> ids = entry.getValue();
            batchDeleteByIds(index, ids);
        }
    }


    //保存
    public void batchSaveToEs(List<testStu> data) throws Exception {
        List<Object> appointLog = new ArrayList<>();
        for (testStu dto : data) {
            testStu esDto = new testStu();
            BeanUtils.copyProperties(dto, esDto);
            //处理相关数据
            dealDate(esDto);
            appointLog.add(esDto);
        }
        esOperation.batchUpdateOrCreate(getIndex(data.getCreatdAt), appointLog);
    }

    //通过创建时间生成对应索引 将数据进行分片处理
    public String getIndex(Date date) {
        String index = "test_stu";
        DateFormat format = new SimpleDateFormat("yyyy");
        return index + "_" + format.format(date);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/50796.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java EE初阶 — synchronized 关键字 - 监视器锁 monitor lock

文章目录1.synchronized 的特性1.1 互斥1.2 可重入2.synchronized 使用示例3.Java 标准库中的线程安全类1.synchronized 的特性 1.1 互斥 synchronized 会起到互斥效果, 某个线程执行到某个对象的 synchronized 中时, 其他线程如果也执行到同一个对象 synchronized 就会阻塞等…

大数据培训之Hadoop序列化

序列化概述 1.1什么是序列化 序列化就是把内存中的对象&#xff0c;转换成字节序列(或其他数据传输协议)以便 于存储到磁盘(持久化)和网络传输。 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化 数据,转换成内存中的对象。 1.2为什么要序列化 一般来说&am…

【leetcode】2259. 移除指定数字得到的最大结果(js实现)

1. 题目 2259. 移除指定数字得到的最大结果 2. 思路 根据题意进行模拟&#xff0c;每次找到与digit相同的元素就将其移除&#xff0c;将剩余的字符串与存储最大值的字符串进行比较&#xff0c;一直保持max中保存的是最大值&#xff0c;最终将max返回。 3. 代码 /*** para…

用强化学习玩《超级马里奥》

Pytorch的一个强化的学习教程&#xff08; Train a Mario-playing RL Agent&#xff09;使用超级玛丽游戏来学习双Q网络(强化学习的一种类型)&#xff0c;官网的文章只有代码&#xff0c; 所以本文将配合官网网站的教程详细介绍它是如何工作的&#xff0c;以及如何将它们应用到…

[附源码]Python计算机毕业设计SSM科技类产品众筹系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

[附源码]计算机毕业设计JAVA乡村振兴惠农推介系统

[附源码]计算机毕业设计JAVA乡村振兴惠农推介系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM myb…

项目ERP与传统ERP的区别是什么?

许多不认为传统 ERP 有用的企业正在寻找项目 ERP。对于企业软件来说&#xff0c;这是一种利用项目管理实践和功能的相对新颖的方法。与传统 ERP 相比&#xff0c;项目 ERP 旨在提供对公司流程更全面的概述。这种类型的解决方案对于建筑公司或项目制造商等项目公司来说是一个很好…

聚类基本概念及常见聚类算法和EM算法

1. 基本概念 1.1 定义 聚类&#xff1a;发现数据中分组聚集的结构&#xff0c;根据数据中样本与样本之间的距离或相似度&#xff0c;依据类内样本距离小&#xff08;相似度大&#xff09;、类间样本距离大&#xff08;相似度小&#xff09;将样本划分为若干组/类/簇。 基于划分…

【毕业设计】1-基于单片机的城市轨道交通列车超速防护系统_里程表设计(原理图+PCB+源码+仿真工程+答辩论文)

typora-root-url: ./ 【毕业设计】1-基于单片机的城市轨道交通列车超速防护系统_里程表设计&#xff08;原理图PCB源码仿真工程答辩论文&#xff09; 文章目录typora-root-url: ./【毕业设计】1-基于单片机的城市轨道交通列车超速防护系统_里程表设计&#xff08;原理图PCB源码…

postgres源码解析39 表创建执行全流程梳理--3

本文结合实例讲解表创建执行流程 [CREATE TABLE wp_shy(id int primary key, name carchar(20))],相关知识回顾见&#xff1a; postgres源码解析38 表创建执行全流程梳理–1 postgres源码解析38 表创建执行全流程梳理–2 执行流程图 transformCreateStmt函数是表创建真正的入口…

系统封装制作

工具网址&#xff1a; 镜像下载&#xff1a; Windows 10 22H2 - MSDN - 山己几子木 (sjjzm.com)pe工具&#xff1a;【新提醒】优启通 v3.7.2022.0910&#xff08;2022.10.14 发布&#xff09;_IT天空原创软件_IT天空 (itsk.com)万能驱动&#xff1a;万能驱动 v7.22.0912.2&…

国产全志T3+Logos FPGA开发板(4核ARM Cortex-A7)规格书

评估板简介 创龙科技TLT3F-EVM是一款基于全志科技T3四核ARM Cortex-A7 + 紫光同创Logos PGL25G/PGL50G FPGA设计的异构多核国产工业评估板,ARM Cortex-A7处理器单元主频高达1.2GHz。评估板由核心板和评估底板组成,核心板CPU、FPGA、ROM、RAM、电源、晶振、连接器等所有器件均…

[附源码]计算机毕业设计springboot企业售后服务管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

如何学习运放实战设计?学这些精髓

一、同相放大电路虚短&#xff1a;运放理想放大倍数10万倍&#xff0c;一般输出都是3V或5V较多&#xff0c;运放放大的是输入信号的压差&#xff0c;放大10万倍的话&#xff0c;只能说明输入信号的压差非常非常接近&#xff0c;近似相等&#xff0c;我们称之为虚短&#xff08;…

多功能便携式吸尘器设计

目 录 摘 要 i Abstract ii 1 引言 1 2 多功能便携式吸尘器的历史及发展 2 2.1 多功能便携式吸尘器的历史 2 2.2 业界的发展情况 3 3 多功能便携式吸尘器的分类 5 3.1 卧式&#xff08;Canister&#xff09; 5 3.2 立式&#xff08;Upright&#xff09; 5 3.3 手持式 &#xff…

[附源码]SSM计算机毕业设计学校失物招领系统JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Jetbrains idea整合远程的docker服务器

Jetbrains idea整合远程的docker服务器准备工作服务端准备开发环境准备整合远程服务器的步骤服务端配置客户端配置准备工作 服务端准备 首先我们需要一台远程的安装了docker的CentOS服务器&#xff08;其他Linux服务器的操作区别不大&#xff09;&#xff0c;课参考我的文章&…

防水蓝牙耳机哪个好?防水音质好的蓝牙耳机推荐

真无线蓝牙耳机即将成为人们日后必不可少的东西&#xff0c;它已经逐渐成为我们生活中最重要的听觉媒介&#xff0c;是有线耳机给不了的自由感&#xff0c;有一些蓝牙耳机还可以让你在下雨天没带雨伞时依然可以带着听歌&#xff0c;今天给大家介绍四款防水能力极好的蓝牙耳机&a…

【发表案例】智能物联网类SCIEI,仅25天录用,计算机领域必投SCI快刊,12月截稿

【期刊简介】3.5-4.0&#xff0c;JCR2区&#xff0c;中科院3区 【检索情况】SCI&EI双检&#xff0c;正刊 【征稿领域】基于人工智能的工业物联网智能传感器 【参考周期】3个月左右 【截稿日期】2022年12月30日 【期刊简介】2.0-3.0&#xff0c;JCR3区&#xff0c;中科院…

fastapi_No.24_日志记录系统

文章目录第一步&#xff1a;创建日志记录器第二步&#xff1a;挂载日志记录器第三步&#xff1a;使用日志记录第四步&#xff1a;清除日志记录器完整代码第一步&#xff1a;创建日志记录器 利用loguru包创建一个日志记录器。 from loguru import logger from datetime import…