本地部署Canal笔记-实现MySQL与ElasticSearch7数据同步

news2025/1/16 6:01:02

背景

本地搭建canal实现mysql数据到es的简单的数据同步,仅供学习参考

建议首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki

前提条件

  • 本地搭建MySQL数据库
  • 本地搭建ElasticSearch
  • 本地搭建canal-server
  • 本地搭建canal-adapter

操作步骤

本地环境为window11,大部分组件采用docker进行部署,MySQL采用8.0.27,

在这里插入图片描述

搭建MySQL数据库

推荐使用docker-desktop,可视化操作,首先找到需要使用的版本直接pull到本地,通过界面进行容器启动,指定环境变量并定义挂载卷

在这里插入图片描述

也可本地使用docker命令部署MySQL,需要指定环境变量,root账户密码和挂载本地配置文件

#命令根据自己本地路径和配置进行更改
docker run  
--env=MYSQL_ROOT_PASSWORD=123456 
--volume=D:\docker\mysql\data:/var/lib/mysql/ 
--volume=D:\docker\mysql\conf:/etc/mysql/conf.d 
--volume=/var/lib/mysql 
-p 3306:3306 
-p 33060:33060 --runtime=runc -d 
--privileged=true 
mysql:latest 

命令说明:

  • -p 3306:3306
    将宿主机的 3306 端口映射到 docker 容器的 3306 端口,格式为:主机(宿主)端口:容器端口
  • --name mysql
    运行服务的名字
  • -v
    挂载数据卷,格式为:宿主机目录或文件:容器内目录或文件
  • -e MYSQL_ROOT_PASSWORD=123456
    初始化 root 用户的密码为 123456
  • -d
    后台程序运行容器
  • --privileged=true 开启特殊权限
    Docker 挂载主机目录时(添加容器数据卷),如果 Docker 访问出现 cannot open directory:Permission denied,在挂载目录的命令后多加一个 --privileged=true 参数即可。
    因为出于安全原因,容器不允许访问任何设备,privileged 让 docker 应用容器获取宿主机 root 权限(特殊权限),允许我们的 Docker 容器访问连接到主机的所有设备。容器获得所有能力,可以访问主机的所有设备,例如,CD-ROM、闪存驱动器、连接到主机的硬盘驱动器等。

下一步在MySQL配置文件中my.cnf设置开启binlog,更改配置文件之后重启MySQL

# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA

#
# The MySQL  Server configuration file.
#
# For explanations see
# http://dev.mysql.com/doc/mysql/en/server-system-variables.html

[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
secure-file-priv= NULL

--skip-host-cache
--skip-name-resolve

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

# Custom config should go here
!includedir /etc/mysql/conf.d/

更改完配置文件之后验证binlog是否打开,并创建canal用户,设置权限

-- 查询binlog配置
show variables like 'binlog_format';
show variables like 'log_bin';

-- 创建canal账户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

show grants for 'canal';

准备工作

  • 新建数据库
CREATE SCHEMA `canal_manager` DEFAULT CHARACTER SET utf8mb4 ;
  • 新增测试表
create table test_canal
(
    id   int auto_increment
        primary key,
    name text null
);

搭建ELK

本地搭建elk推荐使用docker-compose进行搭建,单独搭建的话需要进行各种配置比较麻烦,我本地搭建的版本为7.16.1,如果需要更改版本或者更改其他配置可以直接更改下面docker-compose.yaml文件

docker-compose.yaml

services:
  elasticsearch:
    image: elasticsearch:7.16.1
    container_name: es
    environment:
      discovery.type: single-node
      ES_JAVA_OPTS: "-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
      - "9300:9300"
    healthcheck:
      test: ["CMD-SHELL", "curl --silent --fail localhost:9200/_cluster/health || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 3
    networks:
      - elastic
  logstash:
    image: logstash:7.16.1
    container_name: log
    environment:
      discovery.seed_hosts: logstash
      LS_JAVA_OPTS: "-Xms512m -Xmx512m"
    volumes:
      - ./logstash/pipeline/logstash-nginx.config:/usr/share/logstash/pipeline/logstash-nginx.config
      - ./logstash/nginx.log:/home/nginx.log
    ports:
      - "5000:5000/tcp"
      - "5000:5000/udp"
      - "5044:5044"
      - "9600:9600"
    depends_on:
      - elasticsearch
    networks:
      - elastic
    command: logstash -f /usr/share/logstash/pipeline/logstash-nginx.config
  kibana:
    image: kibana:7.16.1
    container_name: kib
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch
    networks:
      - elastic
networks:
  elastic:
    driver: bridge

Deploy with docker compose

此处不再详细讲解docker-compse如何部署了

$ docker compose up -d
Creating network "elasticsearch-logstash-kibana_elastic" with driver "bridge"
Creating es ... done
Creating log ... done
Creating kib ... done

Expected result

Listing containers must show three containers running and the port mapping as below:

$ docker ps
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS                    PORTS                                                                                            NAMES
173f0634ed33        logstash:7.8.0        "/usr/local/bin/dock…"   43 seconds ago      Up 41 seconds             0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 0.0.0.0:9600->9600/tcp, 0.0.0.0:5000->5000/udp   log
b448fd3e9b30        kibana:7.8.0          "/usr/local/bin/dumb…"   43 seconds ago      Up 42 seconds             0.0.0.0:5601->5601/tcp                                                                           kib
366d358fb03d        elasticsearch:7.8.0   "/tini -- /usr/local…"   43 seconds ago      Up 42 seconds (healthy)   0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp     

After the application starts, navigate to below links in your web browser:

  • Elasticsearch: http://localhost:9200
  • Logstash: http://localhost:9600
  • Kibana: http://localhost:5601/api/status

准备工作

  • 进入kibana,新增测试索引
PUT /test_canal
{
  "mappings":{
      "properties":{
        "id": {
          "type": "long"
        },
        "name": {
          "type": "text"
    }
  }
}
}

搭建canal-server

推荐使用官网下载release版本进行解压部署,官网地址:https://github.com/alibaba/canal/wiki/QuickStart,也可以使用docker进行部署,重点是部署完成后需要修改配置文件。

  • 更改配置
vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

具体配置文件介绍可以查看:https://github.com/alibaba/canal/wiki/AdminGuide

注意: 使用docker部署时,如果没有设置为同一个网络,数据库地址无法使用127.0.0.1

  • 启动
sh bin/startup.sh

搭建canal-adapter

基本功能

canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:

  • 客户端启动器
  • 同步管理REST接口
  • 日志适配器, 作为DEMO
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能
  • (后续支持) ElasticSearch多表数据同步,ETL功能

适配器整体结构

client-adapter分为适配器和启动器两部分, 适配器为多个fat jar, 每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载, 目前所有支持的适配器都放置在plugin目录下

启动器为 SpringBoot 项目, 支持canal-client启动的同时提供相关REST管理接口, 运行目录结构为:

- bin
    restart.sh
    startup.bat
    startup.sh
    stop.sh
- lib
   ...
- plugin 
    client-adapter.logger-1.1.1-jar-with-dependencies.jar
    client-adapter.hbase-1.1.1-jar-with-dependencies.jar
    ...
- conf
    application.yml
    - hbase
        mytest_person2.yml
- logs

以上目录结构最终会打包成 canal-adapter-*.tar.gz 压缩包

适配器配置介绍

总配置文件 application.yml

adapter定义配置部分

canal.conf:
  canalServerHost: 127.0.0.1:11111          # 对应单机模式下的canal server的ip:port
  zookeeperHosts: slave1:2181               # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准
  mqServers: slave1:6667 #or rocketmq       # kafka或rocketMQ地址, 与canalServerHost不能并存
  flatMessage: true                         # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  batchSize: 50                             # 每次获取数据的批大小, 单位为K
  syncBatchSize: 1000                       # 每次同步的批数量
  retries: 0                                # 重试次数, -1为无限重试
  timeout:                                  # 同步超时时间, 单位毫秒
  mode: tcp # kafka rocketMQ                # canal client的模式: tcp kafka rocketMQ
  srcDataSources:                           # 源数据库
    defaultDS:                              # 自定义名称
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url 
      username: root                                            # jdbc 账号
      password: 121212                                          # jdbc 密码
  canalAdapters:                            # 适配器列表
  - instance: example                       # canal 实例名或者 MQ topic 名
    groups:                                 # 分组列表
    - groupId: g1                           # 分组id, 如果是MQ模式将用到该值
      outerAdapters:                        # 分组内适配器列表
      - name: logger                        # 日志打印适配器
......           

说明:

  1. 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
  2. 目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息

操作流程

从官网下载relaes版本之后进行解压,解压之后修改cnf目录下得application.yml文件,根据自己得需求选择下面不同的adapters进行配置,我本地安装的es是es7.16,选择es7的配置,具体配置的含义请参考上方适配器配置介绍

  • 配置application.yaml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true # 配置数据库地址
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7
        hosts:  172.22.80.1:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: docker-cluster
#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:
  • 新增测试yml文件,再es7目录下新建testcanal.yml配置,更加复杂的映射处理可以参考canal官网示例:

    https://github.com/alibaba/canal/wiki/Sync-ES

在这里插入图片描述

  • 启动canal-adapter启动器
bin/startup.sh

windows启动.bat文件

  • 验证

新增mysql mytest.test_canal表的数据, 将会自动同步到es的test_canal索引下面, 并会打出DML的log

番外:adapter管理REST接口

注意:部分操作需要指定cnf下的yml配置

查询所有订阅同步的canal instance或MQ topic
curl http://127.0.0.1:8081/destinations
数据同步开关
curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT

针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

数据同步开关状态
curl http://127.0.0.1:8081/syncSwitch/example

查看指定 canal instance/MQ topic 的数据同步开关状态

手动ETL
  • 不带参数
curl http://127.0.0.1:8081/etl/es7/testcanal.yml -X POST
  • 带参数
curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST -d "params=2018-10-21 00:00:00"

导入数据到指定类型的库, 如果params参数为空则全表导入, 参数对应的查询条件在配置中的etlCondition指定

查看相关库总数据
curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml

常见问题

not part of the cluster Cluster [elasticsearch], ignoring…

查看canal-adapter配置中的application.yml配置是否和es中的cluster_name配置相同
在这里插入图片描述
在这里插入图片描述

连接问题

如果本地使用docker进行部署,未指定网桥,切记不能使用127.0.0.1进行es连接或者mysql数据库连接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1366430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计模式④ :分开考虑

一、前言 有时候不想动脑子,就懒得看源码又不像浪费时间所以会看看书,但是又记不住,所以决定开始写"抄书"系列。本系列大部分内容都是来源于《 图解设计模式》(【日】结城浩 著)。该系列文章可随意转载。 …

小白入门基础 - Restful

一:REST与RESTful: REST:表现层状态转移,资源在网络中以某种形式进行状态转移。 RESTful是基于REST理念的一套开发风格,是具体的开发规则。 服务器端只返回数据,以json或者xml的格式。 RESTful开发规范&a…

小白新手轻松部署扫雷小游戏

小白新手轻松部署扫雷小游戏 云效云效操作导入资源镜像仓库应用配置 最后 说到扫雷小游戏,可以说大家都玩儿过,印象中刚接触计算机的时候,对于这个扫雷小游戏,很多人都很喜欢,觉得很有意思,大家一起挑战看谁…

win10报错“COMDLG32.OCX文件丢失,软件无法启动”,快速修复方法,亲测有效

COMDLG32.OCX文件是Windows操作系统中的一个ActiveX控件文件,是Common Dialogs控件的一部分,主要用于支持各种软件运行时的一些基本功能,如打开、保存文件对话框等。 COMDLG32.OCX文件的作用主要体现在以下几点: 1. 提供应用程序通…

Commander One for Mac:强大的双窗格文件管理器,让你的工作效率倍增!

Commander One for Mac是一款功能强大的文件管理工具,具有以下主要功能: 双窗格设计:主界面分为两个窗格,用户可以在左侧窗格中导航和浏览文件系统的目录结构,在右侧窗格中查看文件和文件夹的内容。文件操作&#xff…

【教学类-45-02】X-Y之间的三连减题(a-b-c=)

作品展示: 背景需求: 【教学类-45-01】X-Y之间的三连加题(abc)-CSDN博客文章浏览阅读5次。【教学类-45-01】X-Y之间的三连加题(abc)https://blog.csdn.net/reasonsummer/article/details/135436915 有了三连加怎么能没有三连减,修改参数&am…

Vmware安装Windows11系统及下载MySQL步骤(超详细)

一、创建虚拟机 ①选择自定义 ②直接点击下一步 ③选择Windows 11 x64 ④命名虚拟机以及选择路径 ⑤新版本的虚拟机需要加密(密码需要8个字符以上) ⑥选择UEFI ⑦处理器配置(根据自己的需求) ⑧设置虚拟机的内存 ⑨选择不使用网络…

LeetCode 2807. 在链表中插入最大公约数【链表,迭代,递归】1279

本文属于「征服LeetCode」系列文章之一,这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁,本系列将至少持续到刷完所有无锁题之日为止;由于LeetCode还在不断地创建新题,本系列的终止日期可能是永远。在这一系列刷题文章…

天锐绿盾文档加密系统

绿盾文档加密系统是一种针对企业数据安全而设计的解决方案,旨在保护企业核心数据不被泄露。该系统由文件加密模块、内网安全模块等部分组成,主要功能包括对需要保护的文件进行强制加密保护,并对文件的使用进行全程监控。通过绿盾文档加密系统…

数据结构之B树和B+树

数据结构可视化演示链接,也就是视频中的网址 文章目录 一、B-Tree二、BTree(B-Tree变种) 一、B-Tree 叶节点具有相同的深度,叶节点的指针为空所有索引元素不重复节点中的数据索引从左到右递增排列 二、BTree(B-Tree变种) 非叶子节点不存储 data &#…

如何进行深入的竞品分析:掌握这些技巧让你更加了解市场

随着互联网行业的快速发展,产品经理需要对竞品进行深入分析,才能更好地把握市场需求和趋势,为公司带来更好的商业价值。那么,如何做好竞品分析呢?以下是我对于这个问题的思考和建议。 一、确定分析的目的和范围 在开…

积分的求法

1.第一类换元积分法(凑微分法):用dt代替dx,积分消失加常数 2.第二类换元积分法(用于开根号):用sint,cost,tant代替x,其中tant可以用于secx的平方-1tanx的平方 3.倒代换&#xff1a…

10个常用恶意软件检测分析平台(网工精选)

你们好,我的网工朋友。 我们往期的文章已经同步过很多的好用工具,毕竟,作为一个职场网工人,提升效率才是才能在单位时间内赚到更多的工资 往期的好用工具收到了不少好的反响,今天更新一波新的工具安利,专…

Python实现PowerPoint(PPT/PPTX)到PDF的批量转换

演示文稿是一种常见传达信息、展示观点和分享内容的形式,特别是PowerPoint演示文稿,广泛应用于各行各业,几乎是演讲等场合的必备工具。然而,演示文稿也有其限制,对设备的要求较高,且使用不同的软件或设备演…

用户管理第2节课--idea 2023.2 后端--实现基本数据库操作(操作user表) -- 自动生成 --合并生成后的代码【鱼皮】

一、模块页面功能 1.1 domain 【实体对象】 1.2 mapper 【操作数据库的对象】--> UserMapper 1)UserMapper 其实就是我们用来操作数据库的一个对象 2) 继承了mybatis- plus,它会自动帮我们去定义一些增删改查的方法。 继承可以看下图&#xf…

1.4~1.5链表复习,代码操作(反转链表(用栈解决,双指针),删除链表指定元素),链表选择题,广义表

删除链表内指定范围的数 思路是双指针,定义两个指针,一个去找当前这个数满不满足要求,然后另一个定义为删除区间的起点 , 当不满足时,两个指针同时向后移动;当满足时,前驱指针就不动了&#xf…

决策树--CART分类树

1、介绍 (1)简介 CART(Classification and Regression Trees)分类树是一种基于决策树的机器学习算法,用于解 决分类问题。它通过构建树状的决策规则来对数据进行分类。 (2)生成过程 ① 选择…

AR眼镜定制_ar智能眼镜5G硬件解决方案

AR眼镜近年来备受瞩目,其易于佩戴、轻巧耐用、功能强大、用途广泛的特点受到了广泛关注。 AR眼镜的应用场景非常广泛,不仅包括消费级市场,还涵盖了旅游、教育、工业、医疗等多个领域。新的工业AR穿戴技术以及工业级语音交互操作系统&#xff…

2020年认证杯SPSSPRO杯数学建模D题(第二阶段)让电脑桌面飞起来全过程文档及程序

2020年认证杯SPSSPRO杯数学建模 D题 让电脑桌面飞起来 原题再现: 对于一些必须每天使用电脑工作的白领来说,电脑桌面有着非常特殊的意义,通常一些频繁使用或者比较重要的图标会一直保留在桌面上,但是随着时间的推移,…

【css】快速实现鼠标悬浮变色效果

<div class"nav-item"><div class"ic-img"></div><div>切换</div> </div>.nav-item {width: 100rem;height: 45rem;line-height: 45rem;display: flex;text-align: center;justify-content: center;align-items: cent…