搭建 canal 监控mysql数据到 elasticsearch 中(本机到远端sql)

news2025/1/12 12:11:22

搭建 canal 监控mysql数据到 elasticsearch 中(本机到远端sql)

需求:

要将 MySQL 数据库 info 中的 notice 和 result 表的增、删、改操作同步到 Elasticsearch 的 notice 和 result 索引,您需要正确配置 MySQL、Canal 、Canal Adapter 、 Elasticsearch 和 kibana
系统rocky9.2
IP192.168.174.136
IP192.168.174.137
服务/版本
mysql:8.0.26 - - 192.168.174.137
Canal:1.1.8 - - 192.168.174.136
canal.adapter:1.1.8 - - 192.168.174.136
Elasticsearch:8.15.0 - - 192.168.174.136
kibana:8.15.0 - - 192.168.174.136

搭建mysql - Elasticsearch - kibana 参考文章
https://blog.csdn.net/yhl18931306541/article/details/141678279?spm=1001.2014.3001.5501

下载安装 Canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.adapter-1.1.8-SNAPSHOT.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.deployer-1.1.8-SNAPSHOT.tar.gz

解压canal 与 canal-adapter

mkdir /usr/local/canal.adapter
mkdir /usr/local/canal
tar -xf canal.adapter-1.1.8-SNAPSHOT.tar.gz -C /usr/local/canal.adapter
tar -xf canal.deployer-1.1.8-SNAPSHOT.tar.gz -C /usr/local/canal

修改配置

cd /usr/local/canal/ 
vim conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# 这个很重要 相当于从库的 server-id 参数,必须要比主库的server-id数大
canal.instance.mysql.slaveId=1001

# enable gtid use true/false
canal.instance.gtidon=false

# position info
# 远端mysql地址
canal.instance.master.address=192.168.174.137:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
# mysql账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# 需要监控的数据库表
# table regex
canal.instance.filter.regex=info\\.devops,info\\.test
# table black regex
# 略过的数据库表
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################
vim  conf/canal.properties
因为我们要将监控到的数据传到ES,所以修改地方比较少
canal.serverMode = tcp
canal.instance.tsdb.enable = true
# 集群的配置只要把H2改为mysql,因为要进行元数据管理。
#canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3066/canal_manager
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = 123456

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_manager
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = 123456
# 这个数据库非远端的数据库,需要在本机或者其他机器上创建一个数据库,方便保存canal历史命令和日志
作者是在本机创建的mysql数据库
登录进去之后,创建的canal_manager数据库

> create database canal_manager

配置文件中注释的中文备注,记得观察。

启动 canal

./bin/startup.sh 

# 查看日志
tail -f ./logs/canal/canal.log 
输出如下及正常。

在这里插入图片描述

tail -f canal.deployer/logs/example/example.log
这个日志记录的 bin-log日志读取到什么地方了

如果报错如下:
[main] WARN o.s.context.support.ClassPathXmlApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'tableMetaTSDB' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'metaHistoryDAO' while setting bean property 'metaHistoryDAO'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'metaHistoryDAO' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'sqlSessionFactory' while setting bean property 'sqlSessionFactory'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sqlSessionFactory' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'dataSource' while setting bean property 'dataSource'; nested exception is org.springframework.beans.factory.CannotLoadBeanClassException: Cannot find class [com.alibaba.druid.pool.DruidDataSource] for bean with name 'dataSource' defined in class path resource [spring/tsdb/h2-tsdb.xml]; nested exception is java.lang.ClassNotFoundException: com.alibaba.druid.pool.DruidDataSource
处理
将druid的jar包放在lib目录就可以了。druid-1.2.22.jar测试通过
下载地址 :https://repo1.maven.org/maven2/com/alibaba/druid/1.2.22/

安装 Canal.adapter

cd /usr/local/canal.adapter
vim application.yml
将多余的删除,取其精华即可
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    # canal.tcp.server.host需要修改
    # 本机地址
    canal.tcp.server.host: 192.168.174.136:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      # url,username,password需要修改 canal_manager是库名
      # 这个地址也不是远端的地址,是本机地址
      url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true
      username: root
      password: 123456

  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: gl 
      outerAdapters:
      - name: logger
      # name需要修改
      # 这个表示我们使用的是哪个适配器,es8 表示使用的是 es8 适配器,其他的可以参考解压后的 conf 下面的目录名称
      - name: es8
        # hosts需要修改(注意,要加上http://)
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          # cluster.name需要修改 ES集群名称
          cluster.name: es-dev

适配器配置

在上面的启动器的配置中我们已经配置了 ES8 作为适配器,那具体要同步的是哪张表, 以及对应的 ES中是索引是哪个怎么配置呢?这些配置就放在适配器的配置里面, 每一个适配器的配置都是一个想要同步到 ES 的模板配置。

因为我使用的es8适配器,所以进到es8中
cd conf/es8
cp -v mytest_user.yml esMappingNotice.yml
rm -rf biz_order.yml  customer.yml   mytest_user.yml
vim esMappingNotice.yml
dataSourceKey: defaultDS
destination: example
groupId: gl
esMapping:
  _index: notice
  _id: _id
  _type: _doc
  upsert: true
  sql: "
SELECT
        c.id AS _id,
        c.title AS title,
        c.content AS content,
        DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
        DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
        notice AS c
"
  commitBatch: 3000
注释:
dataSourceKey: defaultDS
destination: example
outerAdapterKey: es-key
groupId: g1
上面的几个配置,都需要跟启动器里面的配置保持一致。

esMapping:该配置是表示的是如何将 MySQL 的数据同步到 ES 中,配置比较复杂,其中
_index 表示 ES 的索引(需要提前创建);
_id 和 pk 二选一配置,表示使用查询出来的哪个字段作为唯一值;
upsert 表示对应主键的数据不存在的时候执行插入动作,存在的时候执行更新动作;
sql:表示要同步的数据,这个的 SQL 形式要求会比较严格
而且 _id 必须要加别名,我索性把所有都改个别名

commitBatch: 3000 设置了每次批量提交的记录数量为 3000。这意味着每当 canal.adapter 收集到 3000 条记录时,
会将这些记录批量提交到 Elasticsearch。确保这个批量大小适合你的数据量和 Elasticsearch 的处理能力,
以避免超时或性能问题。如果你遇到性能瓶颈,可以尝试调整这个参数值,增大或减小批量大小来优化性能。

vim esMappingResult.yml

dataSourceKey: defaultDS
destination: example
groupId: gl
esMapping:
  _index: result
  _id: _id
  _type: _doc
  upsert: true
  sql: "
SELECT
        c.id AS _id,
        c.user_id AS userid,
        c.score AS score,
        DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
        DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
        result AS c
"
  commitBatch: 3000
时间类型的表结构想要存到es中必须自定义转换器或格式化程序,将 Timestamp 转换为 Elasticsearch 支持的日期格式
(例如 ISO 8601 格式)。  否则导入时报错
ERROR c.a.otter.canal.client.adapter.es8x.etl.ESEtlService - cannot write xcontent for unknown value of type class java.sql.Timestamp
java.lang.IllegalArgumentException: cannot write xcontent for unknown value of type class java.sql.Timestamp

然后启动canal.adapter

赋权:
cd /usr/local/canal.adapter/
chmod 777 -R conf/es8
./bin/startup.sh 

tail -f logs/adapter/adapter.log 
2024-08-30 15:06:03.275 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-gl succeed
2024-08-30 15:06:03.275 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2024-08-30 15:06:03.275 [Thread-3] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2024-08-30 15:06:03.285 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 3.543 seconds (JVM running for 4.264)
2024-08-30 15:06:03.368 [Thread-3] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
2024-08-30 15:09:27.964 [http-nio-8081-exec-1] INFO  o.a.catalina.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
输出如上则为成功

在这里插入图片描述

然后测试

# 插入数据,查看一下Canal.adapter是否可以读到数据
登录到mysql中

INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (22, 'New Notice', 'This is a new notice', NOW(), NOW());
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (23, 'New Notice', 'This is a new notice', NOW(), NOW());

tail -f logs/adapter/adapter.log 
输出如下:说明成功
2024-08-30 15:03:05.827 [pool-3-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":22,"title":"New Notice","content":"This is a new notice","created_at":1725001385000,"updated_at":1725001385000}],"database":"info","destination":"example","es":1725001385000,"groupId":"gl","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1725001385626,"type":"INSERT"}
2024-08-30 15:03:15.858 [pool-3-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":23,"title":"New Notice","content":"This is a new notice","created_at":1725001395000,"updated_at":1725001395000}],"database":"info","destination":"example","es":1725001395000,"groupId":"gl","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1725001395857,"type":"INSERT"}

在这里插入图片描述

然后全量导入一次数据
curl "localhost:8081/etl/es8/esMappingNotice.yml" -X POST
{"succeeded":true,"resultMessage":"导入ES 数据:23 条"}

esMappingNotice.yml 则为适配器文件的名称。

curl "localhost:8081/etl/es8/esMappingResult.yml" -X POST
{"succeeded":true,"resultMessage":"导入ES 数据:20 条"}
然后打开kibana 或者 elasticsearch-head-5.0.0
作者这里使用的是 elasticsearch-head-5.0.0

在这里插入图片描述

验证同步配置

在 MySQL 数据库中执行一些增、删、改操作,例如:
登录到mysql中,
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (1, 'New Notice', 'This is a new notice', NOW(), NOW());
UPDATE notice SET content = 'Updated content' WHERE id = 1;
DELETE FROM notice WHERE id = 1;

在这里插入图片描述

#### 查询_id 为 10 11 的sql 已经删除

在这里插入图片描述

注释

如果刚来的小伙伴看不懂 一定要结合作者下面这篇文章部署,

这里,这里 注释中所提的下面上面都是这篇文章 👆👇

以上只是告诉大家如果数据库在远端该如何配置canal,其他地方和上面👆这篇文章都一样,只不过数据库位置不一样而已

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2111117.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3--Web前端开发-前端工程化,vue项目

目录 端口配置 element 快速入门 table表格组件 分页组件 Dialog对话框组件 表单组件 端口配置 在vue.config.js中更改 源代码为 const { defineConfig } require(vue/cli-service) module.exports defineConfig({transpileDependencies: true })更改为 const { def…

Linux——redis主从复制、哨兵模式

一、redis 的安全加固&#xff1a; 对redis数据库访问的角度 auth // 验证登录redis 数据库的用户acl // 设置redis用户的权限将配置完成的ACL策略写入配置文件 config rewrite //目前redis生效的配置全部写入到默认配置文件的尾部写入到acl文件中&#xff0c;在加载配置文件时…

《论软件设计模式及其应用》通关范文,软考高级系统架构设计师

论文真题 设计模式(Design Pattern)是一套被反复使用的代码设计经验总结,代表了软件开发人员在软件开发过程中面临的一般问题的解决方案和最佳实践。使用设计模式的目的是提高代码的可重用性,让代码更容易被他人理解,并保证代码可靠性。现有的设计模式已经在前人的系统中…

每日一练:和为K的子数组

一、题目要求 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续非空序列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1], k 2 输出&#xff1a;2示例 2&#xff1a; 输入&#xff1a;n…

python深度学习:从注意力机制到生成模型,全面解析现代AI技术

近年来&#xff0c;伴随着以卷积神经网络&#xff08;CNN&#xff09;为代表的深度学习的快速发展&#xff0c;人工智能迈入了第三次发展浪潮&#xff0c;AI技术在各个领域中的应用越来越广泛。注意力机制、Transformer模型&#xff08;BERT、GPT-1/2/3/3.5/4、DETR、ViT、Swin…

OpenCV结构分析与形状描述符(10)检测并提取轮廓函数findContours()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在二值图像中查找轮廓。 该函数使用算法 253从二值图像中检索轮廓。轮廓是有用的工具&#xff0c;可用于形状分析和对象检测与识别。参见 OpenC…

SDN架构详解

目录 1&#xff09;经典的IP网络-分布式网络 2&#xff09;经典网络面临的问题 3&#xff09;SDN起源 4&#xff09;OpenFlow基本概念 5&#xff09;Flow Table简介 6&#xff09;SDN的网络架构 7&#xff09;华为SDN网络架构 8&#xff09;传统网络 vs SDN 9&#xf…

网络安全与恶意攻击:如何应对?

引言 随着技术的发展&#xff0c;我们的生活越来越依赖于网络。但是&#xff0c;这也暴露了我们的系统对各种网络威胁的脆弱性。无论是个人还是企业&#xff0c;网络安全都成为了我们不能忽视的话题。 网络威胁的类型 网络威胁主要有以下几种&#xff1a; 网络钓鱼攻击&#…

linux学习--第四天

--linux文件操作 文件IO操作包括&#xff1a; &#xff08;注&#xff1a;I&#xff1a;input&#xff08;输入&#xff09;O&#xff1a;output&#xff08;输出&#xff09;&#xff09; open&#xff1a;打开 close&#xff1a;关闭 read&#xff1a;读取 write&#xff1a;…

c++一个数因子和(快速求解)

void 一个数因子和(int 整数) {//缘由https://ask.csdn.net/questions/1054457#answer_1251715int he 0, j 0; string a "";while (j < 整数)if (!(整数%j))he j, a to_string(j) "";cout << a << "的因子和&#xff1a;" …

如何在 Java 应用程序中定位高 CPU 使用率问题

文章目录 ♨ 前言♨ 提前准备♨ 线上定位♨ 结语 ♨ 前言 在运行 Java 应用程序的服务器上&#xff0c;高 CPU 使用率可能会影响应用程序的性能和稳定性。本文将介绍如何通过一系列步骤和工具来准确诊断和解决高 CPU 使用率问题&#xff0c;特别是针对 Java 环境下的应用程序。…

OpenCV影像数据处理入门-学习篇

目录 简介如何安装图像数据处理简单操作视频数据处理简单操作 一、简介 在计算机视觉项目的开发中&#xff0c;OpenCV作为最大众的开源库&#xff0c;拥有了丰富的常用图像处理函数库&#xff0c;可用于开发实时的图像处理、计算机视觉以及模式识别程序。采用C/C语言编写&am…

用于辅助视障人士检测人行道障碍物的 TinyML 模型

这篇论文的标题为《A TinyML model for sidewalk obstacle detection: aiding the blind and visually impaired people》&#xff0c;发表在《Multimedia Tools and Applications》上。以下是论文的主要内容概述&#xff1a; 摘要&#xff1a; 论文介绍了在资源受限的物联网…

C语言程序设计 笔记代码梳理 重制版

前言 第1章 C语言的流程 1.C程序经历的六个阶段 编辑(Edit)预处理(Preprocess)编译(Compile)汇编(Assemble)链接(Link)执行(Execute) 2.C语言编写代码到运行 都是先编译&#xff0c;后链接&#xff0c;最后运行。&#xff08;.c ---> .obj --->.exe&#xff09;这个过…

热老化的行业应用

热老化应用行业&#xff1a;深度解析与图像呈现 热老化&#xff0c;作为一种重要的材料测试方法&#xff0c;在众多行业中扮演着关键角色。它通过模拟产品在高温环境下的长期使用&#xff0c;提前发现潜在的材料缺陷、性能衰退等问题&#xff0c;从而提高产品的可靠性&#xf…

打造个性化时装购物平台:Spring Boot框架的实践

第1章 绪论 1.1背景及意义 随着社会的快速发展&#xff0c;计算机的影响是全面且深入的。人们生活水平的不断提高&#xff0c;日常生活中人们对时装购物系统方面的要求也在不断提高&#xff0c;喜欢购物的人数更是不断增加&#xff0c;使得时装购物系统的开发成为必需而且紧迫的…

顶刊精析|METI:整合细胞形态与空间转录组学的肿瘤微环境分析框架·24-09-06

小罗碎碎念 本期精读文献&#xff1a;《METI: Deep profiling of tumor ecosystems by integrating cell morphology and spatial transcriptomics》 今天分享的这篇文献于2023年8月25日发表在Nat Commun&#xff0c;目前IF14.7。 作者类型作者姓名单位名称&#xff08;中文&am…

【免费分享】25秋招提前批25秋招信息表

秋招&#xff0c;即秋季校园招聘&#xff0c;通常是指每年秋季&#xff08;大约从9月到11月&#xff09;企业在各大高校举办的招聘活动。这是许多公司为了吸引优秀应届毕业生而进行的招聘活动&#xff0c;也是许多学生毕业后进入职场的重要途径。以下是秋招的一些关键点&#x…

手机TF卡格式化后数据恢复:方法、挑战与预防措施

在现代生活中&#xff0c;‌手机已经成为我们不可或缺的一部分&#xff0c;‌而TF卡&#xff08;‌即MicroSD卡&#xff09;‌作为手机存储的扩展&#xff0c;‌更是承载了我们大量的重要数据。‌然而&#xff0c;‌不慎的格式化操作往往导致数据丢失&#xff0c;‌给用户带来不…

集成电路学习:什么是RAM随机存取存储器

RAM&#xff1a;随机存取存储器 RAM&#xff08;Random Access Memory&#xff0c;随机存取存储器&#xff09;是计算机中用于临时存储数据和程序指令的重要存储设备。以下是关于RAM的详细解析&#xff1a; 一、RAM的定义与功能 RAM是一种内部存储器&#xff0c;与CPU直接交换…