基于Canal实现Mysql数据实时同步到Elasticsearch(Docker版)

news2025/1/18 11:44:31

1、Canal简介

  Canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。

  Canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向MySQL主库发送dump协议,MySQL主库收到dump请求会向canal推送binlog,canal通过解析binlog将数据同步到其他存储中去。
在这里插入图片描述

官方文档:《传送门》。

2、基于Docker实现Mysql5.7的安装并开启binlog日志

2.1、Mysql安装
[root@localhost /]# docker pull mysql:5.7

[root@localhost /]# docker run --name mysql5.7 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
2.2、开启Mysql5.7的binlog配置
#进入docker中的 Mysql5.7
[root@localhost /]# docker exec -it mysql5.7 /bin/bash

#在docker环境内安装vim工具,方便修改文件
bash-4.2# yum install vim

#修改my.cnf配置,修改内容如下:
bash-4.2# vim /etc/my.cnf

#退出docker
bash-4.2# exit

#重启docker mysql
[root@localhost /]# docker restart mysql5.7

  在/etc/my.cnf配置文件中添加如下配置:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

  在Mysql重新启动后,然后可以在客户端中执行show VARIABLES like 'log_bin’如果发现其中的value中为“ON”则说明配置生效了。

在这里插入图片描述

2.3、创建授权用户

  创建Mysql用户,方便后续使用。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、基于docker实现Elasticsearch的安装和运行

#拉去镜像
docker pull elasticsearch:7.12.0

#创建docker容器挂在的目录
mkdir -p /usr/local/soft/es/config
mkdir -p /usr/local/soft/es/data
mkdir -p /usr/local/soft/es/plugins

#配置文件,注意:echo “http.host: 0.0.0.0”;“:”后面有个空格!
echo "http.host: 0.0.0.0" >> /usr/local/soft/es/config/elasticsearch.yml

#创建容器并运行
docker run --name elasticsearch -p 9200:9200  -p 9300:9300 \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms84m -Xmx512m" \
-v /usr/local/soft/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /usr/local/soft/es/data:/usr/share/elasticsearch/data \
-v /usr/local/soft/es/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.12.0
  • -p 端口映射
  • -e discovery.type=single-node 单点模式启动
  • -e ES_JAVA_OPTS=“-Xms84m -Xmx512m”:设置启动占用的内存范围
  • -v 目录挂载
  • -d 后台运行

  启动后,测试正常启动页面访问http://127.0.0.1:9200,出现如下页面说明启动成功。

在这里插入图片描述

4、安装并配置canal服务

#拉取镜像
docker pull canal/canal-server:v1.1.6
#运行镜像
docker run --name canal -d canal/canal-server:v1.1.6
#找到文件位置后 exit退出容器 将容器内部文件copy到外部
docker cp canal:/home/admin/canal-server/conf/canal.properties /usr/local/soft/canal-conf/
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /usr/local/soft/canal-conf/
#修改配置文件instance.properties 和 canal.properties,实际上只修改了instance.properties的部分配置,canal.properties使用默认配置

#重新运行镜像,运行前,需要把前面运行的容器停止并删除
docker run --name canal -p 11111:11111 -d -v /usr/local/soft/canal-conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /usr/local/soft/canal-conf/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server:v1.1.6

#查看canal服务日志
#仅能看到canal服务的启动日志,比较简单
docker logs canal
#查看数据同步相关日志
#进入canal服务
[root@localhost ~]# docker exec -it canal /bin/bash
#查看日志,数据同步日志
[root@908acdb7f259 canal-server]# cat canal-server/logs/example/example.log

  instance.properties需要修改的配置如下:

#Mysql数据库地址
canal.instance.master.address=192.168.1.236:3306
#初始化时的日志文件名称,可以不设置
canal.instance.master.journal.name=mysql-bin.000002
#初始化时的日志文件的当前位置,可以不设置
canal.instance.master.position=2071
canal.instance.master.timestamp=
canal.instance.master.gtid=

#Mysql数据库用户名和密码,前面创建的Mysql用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

版本问题:在搭建canal服务的时候,我最初使用的是v1.1.6版本,同步数据的时候出现错误,后改成v1.1.5后,正常。

5、安装并配置canal-adapter

#拉取镜像
docker pull slpcat/canal-adapter:v1.1.5
#创建配置文件
mkdir -p /usr/local/soft/canal-adapter/conf/
#创建配置文件,在/usr/local/soft/canal-adapter/conf/目录下创建
touch application.yml
#创建连接数据库文件es7.yml
mkdir -p /usr/local/soft/canal-adapter/conf/es7
touch es7.yml

#启动
docker run --name canal-adapter -p 8081:8081 -v /usr/local/soft/canal-adapter/conf:/opt/canal-adapter/conf  -d slpcat/canal-adapter:v1.1.5

#查看是否报错
docker logs canal-adapter

  application.yml配置文件内容如下:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 192.168.1.236:11111  #canal服务地址
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.1.236:3306/my_test?useUnicode=true #数据库地址及用户名密码
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7 # 该版本发现只能是es7/es6
        hosts: 192.168.1.236:9200 # 127.0.0.1:9200 for rest mode,ES链接,使用9200 是mode就需要修改成rest
        properties:
          mode: rest # transport # or rest
          cluster.name: elasticsearch

  es7.yml配置文件,主要实现Mysql表字段与ES索引的对应关系,具体内容如下:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: ceshi
  _id: _id
  _type: _doc
  upsert: true
  #  pk: id 
  #SQL 字段映射
  sql: "SELECT 
    a.id AS _id,
    a.username AS username,
    a.age AS age,
    a.test AS test
  from 
    test a "
  #  objFields:
  #    _labels: array:;
  etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数
  commitBatch: 1

  my_test.test表结构:

CREATE TABLE `my_test`.`Untitled`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,
  `age` varbinary(50) NULL DEFAULT NULL,
  `test` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,
  `c_time` datetime(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 18 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

6、其他

  完成上述配置,就完成了Mysql数据实时同步到Elasticsearch的配置,在实际环境中,还需要考虑服务防火墙、查看docker服务运行状态等问题,需要的命令如下:

  Docker命令:

#docker命令
docker ps -a #查看是否启动
docker logs elasticsearch  #启动日志查询,查询什么服务,elasticsearch修改成对应服务名即可
docker restart elasticsearch   #重启
docker exec -it elasticsearch bash #进入docker服务

  Centos7防火墙相关命令:

一、防火墙的开启、关闭、禁用命令

  • 设置开机启用防火墙:systemctl enable firewalld.service
  • 设置开机禁用防火墙:systemctl disable firewalld.service
  • 启动防火墙:systemctl start firewalld
  • 关闭防火墙:systemctl stop firewalld
  • 检查防火墙状态:systemctl status firewalld

二、使用firewall-cmd配置端口

  • 查看防火墙状态:firewall-cmd --state
  • 重新加载配置:firewall-cmd --reload
  • 查看开放的端口:firewall-cmd --list-ports
  • 开启防火墙端口:firewall-cmd --zone=public–add-port=9200/tcp --permanent
  • 关闭防火墙端口:firewall-cmd --zone=public --remove-port=9200/tcp --permanent

命令含义:
–zone #作用域
–add-port=9200/tcp #添加端口,格式为:端口/通讯协议
–permanent #永久生效,没有此参数重启后失效
注意:添加端口后,必须用命令firewall-cmd --reload重新加载一遍才会生效

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/528214.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringMVC常用注解用法

Spring MVC是基于Servlet API构建的原始Web框架。 MVC是Model View Controller的缩写即视图模型控制器,是一种思想,而Spring MVC是对该思想的具体实现。关于SpringMVC的学习我们需要掌握用户和程序的连接、获取参数以及返回数据三大部分。而这三大功能的…

2023-5-15-gRpc框架学习

🍿*★,*:.☆( ̄▽ ̄)/$:*.★* 🍿 💥💥💥欢迎来到🤞汤姆🤞的csdn博文💥💥💥 💟💟喜欢的朋友可以关注一下&#xf…

前端魔力赏盲盒小程序 UI原生盲盒微信小程序源码下载

前端魔力赏盲盒小程序 UI原生盲盒微信小程序源码下载 亲测可用 前端是小程序原生源码。 很不错的一款盲盒小程序。 完全没有毛病,非常适合研究学习。

【Linux】匿名管道

目录 匿名管道管道特点父子进程通过匿名管道通信匿名管道通信案例 橙色 匿名管道 管道也叫无名(匿名)管道,是 UNIX 系统 IPC(进程间通信) 的最古老的形式。 统计一个目录中文件的数目命令 ls | wc -l ,为…

会自动化就能拿20K?想多了,你这顶多算是会点皮毛···

现在招个会自动化测试的人是真难呀~ 前一段时间公司计划要招2个自动化测试到岗,同事面试了十几个来应聘的人,发现一个很奇怪的现象,在面试的时候,如果问的是框架API、脚本编写这些问题,基本上所有人都能对答如流&…

湖北省智能科教研究会走进璞华,调研璞公英教学平台个性化教学新模式

2023年5月9日,热烈祝贺湖北省智能科教研究会红5月智能科教走进璞华集团活动圆满成功。会议上,大家畅所欲言,对教育体制改革与教育信息化产品创新,科技成果转化、产教融合、资源互补、学生能力培养等方面展开充分沟通和探讨。 5月9…

解密 Android IPC 机制

在我们使用 Android 手机的时候,有时我们使用的软件会需要消耗比较大的内存,也经常会需要同时打开多个软件。这些时候,我们都会需要使用到多进程技术。作为 Android 开发者,相信我们都知道如何去开启应用的单个多进程,…

【Linux常见指令以及权限理解】权限理解(4)

写在前面 这篇文章,我们来聊一聊Linux下权限相关的知识,我打算从这几个方面展开: 1. 认识Linux下用户的分类 2. 什么叫做权限 3. 没有权限会是什么样子 4. 如何修改权限 5. 其它重要的问题 那么废话不多说,我们现在开始。 …

PDF怎么转换成Word?将PDF转换为Word的三种方法!

在我们需要将PDF文件转换为Word文件时,有几种方法可以选择。通常,我们在文件传输过程中使用的文件格式是PDF,但如果我们需要对文件进行编辑,就需要将其转换为可编辑的Word格式。下面是几种转换方法的介绍,让我们一起来…

【Python从入门到进阶】Python异常处理

接上篇《18、文件内容序列化和反序列化操作》 上一篇我们学习了文件读取及写入数据序列化和反序列化的操作。本篇我们来学习Python中有关异常(捕获异常、处理异常等)的知识。 一、异常的定义 在编写代码时,我们无法完全掌控程序运行过程中会…

原神服务端建模修改模型贴图(SpecialK)教程

原神服务端建模修改模型贴图(SpecialK)教程 我是艾西,今天跟大家闲聊一下原神建模修改模型等。在一个游戏里开发者会按照自己这个游戏的大方向去运营,而总是有一些小伙伴有不一样的需求,如果是建模拥有独一无二的角色或者是外观装扮等那么艾…

多系统启动U盘Ventory下载、安装、使用

官网链接 Ventoy Ventoy 简介 简单来说,Ventoy是一个制作可启动U盘的开源工具。 有了Ventoy你就无需反复地格式化U盘,你只需要把 ISO/WIM/IMG/VHD(x)/EFI 等类型的文件直接拷贝到U盘里面就可以启动了,无需其他操作。 你可以一次性拷贝很多个…

OpenCL编程指南-4.4矢量操作符

矢量操作符 如下描述了可用于矢量数据类型或矢量和标量数据类型组合的各类操作符。 算术操作符 算术操作符(加()、减(–)、乘(*)和除(/)),可以作用于内置整数、浮点标量和矢量数…

次郎家书——第一天关于数值计算方法考试后——的一些思考和反思

考试的复盘:传送门:数值计算方法考试复盘 对此次考试的看法: 这次考试考试内容虽然有没复习到的如复合辛普森和复合梯形公式还有最小二乘的推广(这里上课的时候听懂了但是复习的时候嫌麻烦没看原来,结果大题是真的写错了&#…

nuxt 一直报错 http://localhost:24678/_nuxt/

解决: 这个错误可能是由于Nuxt.js应用程序无法正确加载/_nuxt/路径下的资源而导致的。解决这个问题的方法有以下几种: 1.检查nuxt.config.js文件 在nuxt.config.js文件中,检查build.publicPath属性是否设置为正确的公共路径。例如&#xff1a…

UART驱动情景分析-read

一、源码框架回顾 shell读数据,一开始的时候没有就休眠。数据从串口发送到驱动,驱动接收到中断,驱动读取串口数据,这个数据会传给行规程。 行规程获取到数据后,会回显。按下删除就删除一个字符,按下回车&am…

pytorch 测量模型运行时间,GPU时间和CPU时间,model.eval()介绍

文章目录 1. 测量时间的方式2. model.eval(), model.train(), torch.no_grad()方法介绍2.1 model.train()和model.eval()2.2 model.eval()和torch.no_grad() 3. 模型推理时间方式4. 一个完整的测试模型推理时间的代码5. 参考: 1. 测量时间的方式 time.time() time.…

使用qt creator编译zlib

zlib被设计为一个免费的,通用的,法律上不受限制的-即不受任何专利保护的无损数据压缩库,几乎可以在任何计算机硬件和操作系统上使用。 官网:http://www.zlib.net/ 下载zlib源码:http://www.zlib.net/zlib1213.zip 备用地址&#x…

关于使用API接口获取商品数据的那些事

随着电商行业的不断发展,越来越多的企业和个人需要获取各大电商平台上的商品数据。而最常用的方法是使用API接口获取商品数据。本文将为您介绍使用API接口获取商品数据的步骤和注意事项。 一、选择API接口 首先需要了解各大电商平台提供的API接口,目前…

由浅入深理解java集合(一)——集合框架 Collection、Map

Java 提供了一套完整的集合类(也可以叫做容器类)来管理一组长度可变的对象(也就是集合的元素),其中常见的类型包括 List、Set、Queue 和 Map。从我个人的编程经验来看,List 的实现类 ArrayList 和 Map 的实…