从零搭建:Canal实时数据管道打通MySQL与Elasticsearch

news2025/2/12 18:12:16

Canal实时同步Mysql BinlogElasticsearch


文章目录

  • Canal实时同步Mysql **Binlog**至**Elasticsearch**
      • 一. 环境准备
        • 1.环境检查
          • 检查`Mysql`是否开启`BinLog`
          • 开启Mysql Binlog
          • Java环境检查
        • 2.新建测试库和表
        • 3.新建Es索引
      • 二.**部署 Canal Server**
        • **2.1 解压安装包**
        • **2.2 配置 Canal Server**
        • **2.3 启动 Canal Server**
      • **三. 部署 Canal Adapter(同步到 Elasticsearch)**
        • **3.1 配置 Adapter**
        • **3.2 配置数据映射**
        • **3.3 启动 Adapter**
      • **4. 验证同步**
        • **4.1 插入测试数据到 MySQL**
        • **4.2 查询 Elasticsearch**

一. 环境准备

  • 操作系统:Linux(Ubuntu 20.04)
  • Java 环境:JDK 8+(建议 OpenJDK 11)
  • MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
  • Elasticsearch:已部署(版本 7.x 或 8.x)
  • Canal 二进制包:从 Canal Release 下载 canal.deployer-1.1.8.tar.gzcanal.adapter-1.1.8.tar.gz
1.环境检查
  • 检查Mysql是否开启BinLog
#root账号执行
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

输出如下证明已经打开:

image-20250211103029832

创建 Canal 用户并授权:

#创建用户
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

# 刷新权限
FLUSH PRIVILEGES;

如果没打开BinLog可以通过如下方法打开:

  • 开启Mysql Binlog

修改my.cnf文件,加入如下内容:

log_bin=mysql-bin
binlog_format=ROW
binlog_expire_logs_seconds=172800
expire_logs_days=2

log_bin:启用二进制日志,日志文件会以 mysql-bin 为前缀,并依次生成日志文件(例如:mysql-bin.000001mysql-bin.000002 等)。

binlog_format:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format 的默认值已经变为 ROW。所以,即使你在配置文件中没有明确设置 binlog_format,MySQL 会默认使用 ROW 作为二进制日志格式。在较早的 MySQL 版本中默认值是 STATEMENT

binlog_expire_logs_seconds=172800expire_logs_days=2:这些配置设置了二进制日志的过期时间(默认情况下,MySQL 会保留二进制日志,直到它们过期或达到日志文件数的限制)。在这种情况下,日志会在 2 天后过期。

配置好后重启Mysql:

systemctl restart mysqld.service
  • Java环境检查
echo $JAVA_HOME
image-20250211111637904
2.新建测试库和表
 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;
 
 CREATE TABLE `test_user` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',
  `sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
  `tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
3.新建Es索引
curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
{
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "title": {
        "type": "text"
      },
      "sex": {
        "type": "text"
      },
      "tel": {
        "type": "text"
      }
    }
  }
}
'

二.部署 Canal Server

2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter

# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server

# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
2.2 配置 Canal Server

修改配置文件 /opt/canal/server/conf/canal.properties

# tcp bind ip
canal.ip =127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112

# 目标实例名称(默认 example)
canal.destinations = example

# 持久化模式(默认内存,可选 H2/MySQL)
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml

这里主要修改canal.ip其他保持默认即可。

修改实例配置 /opt/canal/server/conf/example/instance.properties

#被同步的mysql地址,填写自己的IP地址
canal.instance.master.address=127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Password@123
#数据库连接编码 
canal.instance.connectionCharset = UTF-8 
#Binlog 过滤规则(监控所有库表)
canal.instance.filter.regex=.*\\..*
#指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
canal.mq.topic=example
2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh

# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log

image-20250211153418697

image-20250211153400835

image-20250211153538656

可以看到日志没有明显报错,且进程已经启动,则表示Canal Server已经启动成功。

image-20250211153842261

三. 部署 Canal Adapter(同步到 Elasticsearch)

3.1 配置 Adapter

修改配置文件 /opt/canal/adapter/conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
    
canal.conf:
  mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 对应集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, -1为无限重试
  timeout: # 同步超时时间, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
 
  srcDataSources: # 源数据库配置
    defaultDScanal是测试数据库
      url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库
      username: root #数据库账号
      password: Pass@1234 #数据库密码
  canalAdapters: # 适配器列表
  - instance: example # canal实例名,和上述Server的配置一样
    groups: # 分组列表
    - groupId: g1 # 分组id, 如果是MQ模式将用到该值
      outerAdapters:
      - name: logger # 日志打印适配器
      - name: es8 # ES同步适配器根据自己的es版本来
        hosts: <your IP>:9200 # ES连接地址
        properties:
          mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)
          security.auth: elastic:123456 #  连接es的用户和密码,仅rest模式使用
          cluster.name: elasticsearch # ES集群名称

如何获取es集群名称,命令输出的cluster_name就是上面需要配置的集群名字:

curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"

image-20250211170653195

3.2 配置数据映射

创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: test_user # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         tb.id AS _id,
         tb.name,
         tb.sex,
         tb.tel
        FROM
         test_user us"        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小
3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh

#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log

会输出很多数据库变更的日志:

image-20250211171145018

image-20250211171208031

4. 验证同步

4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');

image-20250211171534121

image-20250211171547780

4.2 查询 Elasticsearch
curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"

也可以在工具上查看,这边是Eage插件:

image-20250211171753695

image-20250211171808091

至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。

image-20250211171849802

image-20250211172502715

然后我们测试修改Es的数据:

image-20250211172542248

image-20250211172555087

可以发现数据库并没有变,至此Canal单向实时同步Mysql BinlogElasticsearch就配置完成了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2296958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《战神:诸神黄昏》游戏闪退后提示弹窗“d3dx9_43.dll缺失”“找不到d3dx11_43.d”该怎么处理?

宝子们&#xff0c;是不是在玩《战神&#xff1a;诸神黄昏》的时候&#xff0c;突然弹出一个提示&#xff1a;“找不到d3dx9_43.dll”或者“d3dx11_43.dll缺失”&#xff1f;这可真是让人着急上火&#xff01;别慌&#xff0c;今天就给大家唠唠这个文件为啥会丢&#xff0c;还有…

Ollama本地部署DeepSeek(Mac)

准备工作 DeepSeek对比 DeepSeek-r1 DeepSeek-R1的多个版本&#xff1a;加上2个原装671B的&#xff0c;总计8个参数版本 DeepSeek-R1 671B DeepSeek-R1-Zero 671B DeepSeek-R1-Distill-Llama-70B DeepSeek-R1-Distill-Qwen-32B DeepSeek-R1-Distill-Qwen-14B DeepSeek-R1-Di…

arm linux下的中断处理过程。

本文基于ast2600 soc来阐述&#xff0c;内核版本为5.10 1.中断gic初始化 start_kernel() -> init_IRQ() -> irqchip_init() of_irq_init()主要是构建of_intc_desc. 489-514: 从__irqchip_of_table中找到dts node中匹配的of_table(匹配matches->compatible)&#xf…

Docker的深入浅出

目录 Docker引擎 Docker镜像 (镜像由多个层组成&#xff0c;每层叠加之后&#xff0c;从外部看来就如一个独立的对象。镜像内部是一个精简的操作系统&#xff08;OS&#xff09;&#xff0c;同时还包含应用运行所必须的文件和依赖包) Docker容器 应用容器化--Docker化 最佳…

内存映射工作原理和适用场景

Linux 内存映射&#xff08;Memory Mapping&#xff09;是一种将文件或其他资源直接映射到进程虚拟内存地址空间的机制&#xff0c;允许进程像访问内存一样访问文件或设备。这种机制通过 mmap() 系统调用实现&#xff0c;常用于高效文件操作、进程间共享内存等场景。 1. 内存映…

自动驾驶超声波雷达:市场潜力爆发,引领未来出行新趋势

在自动驾驶技术的飞速发展中&#xff0c;自动驾驶超声波雷达作为一项关键技术&#xff0c;正逐渐崭露头角&#xff0c;其重要性及市场增长潜力不容忽视。本文将深入探讨自动驾驶超声波雷达的重要性、市场增长趋势、显著优势、全球市场规模与驱动因素、主要市场参与者以及不同地…

41.兼职网站管理系统(基于springbootvue的Java项目)

目录 1.系统的受众说明 2.相关技术 2.1 B/S架构 2.2 Java技术介绍 2.3 mysql数据库介绍 2.4 Spring Boot框架 3.系统分析 3.1 需求分析 3.2 系统可行性分析 3.2.1技术可行性&#xff1a;技术背景 3.2.2经济可行性 3.2.3操作可行性&#xff1a; 3.3 项目设计目…

Linux ARM64 将内核虚拟地址转化为物理地址

文章目录 前言一、通用方案1.1 kern_addr_valid1.2 __pa 二、ARM64架构2.1 AT S1E1R2.2 is_kernel_addr_vaild2.3 va2pa_helper 三、demo演示参考资料 前言 本文介绍一种通用的将内核虚拟地址转化为物理地址的方案以及一种适用于ARM64 将内核虚拟地址转化为物理地址的方案&…

spring学习(使用spring加载properties文件信息)(spring自定义标签引入)

目录 一、博客引言。 二、基本配置准备。 &#xff08;1&#xff09;初步分析。 &#xff08;2&#xff09;初始spring配置文件。 三、spring自定义标签的引入。 &#xff08;1&#xff09;基本了解。 &#xff08;2&#xff09;引入新的命名空间&#xff1a;xmlns:context。 &…

Flutter项目试水

1基本介绍 本文章在构建您的第一个 Flutter 应用指导下进行实践 可作为项目实践的辅助参考资料 Flutter 是 Google 的界面工具包&#xff0c;用于通过单一代码库针对移动设备、Web 和桌面设备构建应用。在此 Codelab 中&#xff0c;您将构建以下 Flutter 应用。 该应用可以…

Linux(Ubuntu)安装pyenv和pyenv-virtualenv

Ubuntu安装pyenv和pyenv-virtualenv 安装 pyenv1. 下载 pyenv2. 配置环境变量3. 重启 Shell4. 安装依赖5.检测是否安装成功 安装 pyenv-virtualenv1. 安装 pyenv-virtualenv2. 配置环境变量3. 重启 Shell pyenv 的使用1. 查看可安装的 Python 版本2. 安装指定版本的 Python3. 查…

调用DeepSeek官方的API接口

效果 前端样式体验链接&#xff1a;https://livequeen.top/deepseekshow 准备工作 1、注册deepseek官网账号 地址&#xff1a;DeepSeek 点击进入右上角【API开放平台】&#xff0c;并进行账号注册。 2、注册完成后&#xff0c;依次点击【API keys】-【生成API key】&#x…

MFC线程安全案例

作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、项目解析 二…

Win11下搭建Kafka环境

目录 一、环境准备 二、安装JDK 1、下载JDK 2、配置环境变量 3、验证 三、安装zookeeper 1、下载Zookeeper安装包 2、配置环境变量 3、修改配置文件zoo.cfg 4、启动Zookeeper服务 4.1 启动Zookeeper客户端验证 4.2 启动客户端 四、安装Kafka 1、下载Kafka安装包…

51c自动驾驶~合集49

我自己的原文哦~ https://blog.51cto.com/whaosoft/13164876 #Ultra-AV 轨迹预测新基准&#xff01;清华开源&#xff1a;统一自动驾驶纵向轨迹数据集 自动驾驶车辆在交通运输领域展现出巨大潜力&#xff0c;而理解其纵向驾驶行为是实现安全高效自动驾驶的关键。现有的开…

Python——批量图片转PDF(GUI版本)

目录 专栏导读1、背景介绍2、库的安装3、核心代码4、完整代码总结专栏导读 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手 🏳️‍🌈 博客主页:请点击——> 一晌小贪欢的博客主页求关注 👍 该系列文章专栏:请点击——>Python办公自动化专…

LabVIEW无人机飞行状态监测系统

近年来&#xff0c;无人机在农业植保、电力巡检、应急救灾等多个领域得到了广泛应用。然而&#xff0c;传统的目视操控方式仍然存在以下三大问题&#xff1a; 飞行姿态的感知主要依赖操作者的经验&#xff1b; 飞行中突发的姿态异常难以及时发现&#xff1b; 飞行数据缺乏系统…

算法16(力扣451)——根据字符出现频率排序

1、问题 给定一个字符串 s &#xff0c;根据字符出现的 频率 对其进行 降序排序 。一个字符出现的频率 是它出现在字符串中的次数&#xff0c; 返回 已排序的字符串。如果有多个答案&#xff0c;返回其中任何一个。 2、示例 &#xff08;1&#xff09; 输入: s "tree&q…

Response 和 Request 介绍

怀旧网个人博客网站地址&#xff1a;怀旧网&#xff0c;博客详情&#xff1a;Response 和 Request 介绍 1、HttpServletResponse 1、简单分类 2、文件下载 通过Response下载文件数据 放一个文件到resources目录 编写下载文件Servlet文件 public class FileDownServlet exten…

ADB详细教程

目录 一、ADB简介 二、配置 配置环境变量 验证是否安装成功 三、简单使用 基本命令 设备连接管理 USB连接 WIFI连接&#xff08;需要USB线&#xff09; 开启手机USB调试模式 开启USB调试 四、其他 更换ADB默认启动端口 一、ADB简介 ADB&#xff08;Android Debug…