Canal 实现MySQL与Elasticsearch7数据同步

news2025/2/24 2:11:17

1 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

优点: 可以完全和业务代码解耦,增量日志订阅。

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

2 canal实现MySQL与Elasticsearch7数据同步

下面介绍下利用canal ,canal adapter实现MySQL与ES7数据同步

2.1 Mysql配置修改

在MySQL中需要创建一个用户,并授权:

-- 使用命令登录:mysql -u root -p 
-- 创建用户 用户名:canal  
create user 'canal'@'%' identified by 'canal'; 
-- 授权 .表示所有库 
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on . to 'canal'@'%' identified by 'canal';

下一步在MySQL配置文件my.cnf设置如下信息:

[mysqld] 
# 开启binlog 
log-bin=mysql-bin 
# 选择ROW(行)模式 
binlog-format=ROW 
# 配置MySQL replaction需要定义,不要和canal的slaveId重复 
server_id=1

改了配置文件之后,重启MySQL

2.2 下载Canal

下载最新的cana1.1.5并解压,1.1.5 才支持Elasticsearch7
下载地址:
canal.adapter-1.1.5-SNAPSHOT.tar.gz(适配器)
canal.deployer-1.1.5-SNAPSHOT.tar.gz(服务端)
在这里插入图片描述
canal.adapter为适配端,canal.deployer为服务端

2.3 启动Canal服务端

2.3.1 修改数据库配置

进入conf/example目录下,修改instance.properties为数据库配置
在这里插入图片描述

在这里插入图片描述

2.3.2 启动服务端

进入bin目录,双击starup.bat启动,出现下面界面,表明启动成功
在这里插入图片描述
在这里插入图片描述
服务端启动成功, 接下来进入客户端测试

2.4 启动Canal客户端

2.4.1 修改配置

进入adapter目录,修改application.yml
在这里插入图片描述
yml内容:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

#  srcDataSources:
#    defaultDS:
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
#      username: root
#      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

2.4.2 创建索引, 根据同步sql数据至Elasticsearch中

调用http://127.0.0.1:9200/product(PUT请求)创建索引,product为索引名字

{
  "mappings" : {
    "properties" : {
      "attrs" : {
        "type" : "nested",
        "properties" : {
          "attrId" : {
            "type" : "long"
          },
          "attrName" : {
            "type" : "keyword"
          },
          "attrValueId" : {
            "type" : "long"
          },
          "attrValueName" : {
            "type" : "keyword"
          }
        }
      },
      "tags" : {
        "type" : "nested",
        "properties" : {
          "tagId" : {
            "type" : "long"
          },
          "seq" : {
            "type" : "integer"
          }
        }
      },
      "brandId" : {
        "type" : "long"
      },
      "brandImg" : {
        "type" : "keyword"
      },
      "brandName" : {
        "type" : "keyword"
      },
      "code" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "commentNum" : {
        "type" : "integer"
      },
      "createTime" : {
        "type" : "date"
      },
      "hasStock" : {
        "type" : "boolean"
      },
      "imgUrls" : {
        "type" : "keyword",
        "index" : false,
        "doc_values" : false
      },
      "mainImgUrl" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "marketPriceFee" : {
        "type" : "long"
      },
      "priceFee" : {
        "type" : "long"
      },
      "saleNum" : {
        "type" : "integer"
      },
      "sellingPoint" : {
        "type" : "text",
        "analyzer" : "ik_max_word",
        "search_analyzer" : "ik_smart"
      },
      "shopId" : {
        "type" : "long"
      },
      "shopImg" : {
        "type" : "keyword",
        "index" : false,
        "doc_values" : false
      },
      "shopName" : {
        "type" : "text",
        "analyzer" : "ik_max_word",
        "search_analyzer" : "ik_smart"
      },
      "shopType" : {
        "type" : "integer"
      },
      "shopPrimaryCategoryId" : {
        "type" : "long"
      },
      "shopPrimaryCategoryName" : {
        "type" : "keyword"
      },
      "shopSecondaryCategoryId" : {
        "type" : "long"
      },
      "shopSecondaryCategoryName" : {
        "type" : "keyword"
      },
      "primaryCategoryId" : {
        "type" : "long"
      },
      "primaryCategoryName" : {
        "type" : "keyword"
      },
      "secondaryCategoryId" : {
        "type" : "long"
      },
      "secondaryCategoryName" : {
        "type" : "keyword"
      },
      "categoryId" : {
        "type" : "long"
      },
      "categoryName" : {
        "type" : "keyword"
      },
      "spuId" : {
        "type" : "long"
      },
      "spuName" : {
        "type" : "text",
        "analyzer" : "ik_max_word",
        "search_analyzer" : "ik_smart"
      },
      "spuStatus" : {
        "type" : "integer"
      },
      "success" : {
        "type" : "boolean"
      }
    }
  }
}

在这里插入图片描述

2.4.3 启动客户端

进入\canal.adapter-1.1.5-SNAPSHOT\bin 双击startup.bat 出现下面界面,表明启动成功
在这里插入图片描述

3 注意事项

3.1 使用队列

2.4.1中客户端配置需要配置队列,我使用的rocketmq,大家可以根据自己需要选择队列,根据选择的队列,需改配置

3.2 启动顺序

需要优先启动Mysql和Elasticsearch,然后启动Canal服务端,最后启动客户端

4 验证

当你在操作界面添加一个商品时,那么此时Elasticsearch也会同步这条数据,具体不展开来了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1027932.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OmniPlan Pro 4.6(Mac项目流程管理工具)

OmniPlan Pro 是 The Omni Group 为 macOS 和 iOS 操作系统开发的项目管理软件。它允许用户创建和管理复杂的项目、定义任务、分配资源、跟踪进度和生成报告。OmniPlan Pro 包括甘特图、网络图、关键路径分析、资源均衡和成本跟踪等功能。 借助 OmniPlan Pro,用户可…

在月球上看地球和太阳是怎么转的?

文章目录 参数初始化运动模型绝对坐标系以太阳和地球为中心以月球为坐标原点 参数初始化 众所周知,地球围绕太阳转,月球围绕地球转。但在地球上看,月亮和太阳都绕着地球转,那么如果我们是土生土长的月球人,我们看到的…

记-数据库事务隔离级别

记-数据库事务隔离级别 一、MySQL数据库默认隔离级别二、JDBC连接的事务隔离级别1. 查看JDBC连接的事务隔离级别2. JDBC连接的事务隔离级别设置过程 三、修改JDBC连接的事务隔离级别1. 全局修改2. 局部修改 一、MySQL数据库默认隔离级别 MySQL数据库默认事务隔离级别为REPEATAB…

微服务生态系统:使用Spring Cloud构建分布式系统

文章目录 什么是微服务?为什么选择Spring Cloud?Spring Cloud的关键组件示例:构建一个简单的微服务步骤1:创建Spring Boot项目步骤2:配置Eureka服务发现步骤3:创建REST控制器步骤4:运行项目步骤…

【R语言】完美解决devtools安装GitHub包失败的问题(以gwasglue为例)

Rstudio,R4.3.1,命令在Rstudio的命令行即console中运行。 文章目录 一、问题复述二、分析三、解决四、安装示例:gwasglue 一、问题复述 使用devtools安装一个github的包。 devtools: devtools 是 R 语言中一个非常有用的包&…

netty之数据读写源码阅读

数据读写 write 从client端的写开始看 client与服务端建立完connect后可以从future里拿到连接的channel对象。这里的channel是io.netty.channel.Channel对象。 调用其channel.writeAndFlush(msg);方法可以进行数据发送。 writeAndFlush会调用pipeline的writeAndFlush方法 …

目标检测算法改进系列之Neck添加渐近特征金字塔网络(AFPN模块)

渐近特征金字塔网络(AFPN模块) 在目标检测任务中,多尺度特征对具有尺度差异的目标进行编码具有重要意义。多尺度特征提取的常用策略是采用经典的自顶向下和自底向上的特征金字塔网络。 然而,这些方法存在特征信息丢失或退化的问…

【音视频】ffplay源码解析-FrameQueue队列

帧队列架构位置 结构体源码 FrameQueue结构体 /* 这是一个循环队列,windex是指其中的首元素,rindex是指其中的尾部元素. */ typedef struct FrameQueue {Frame queue[FRAME_QUEUE_SIZE]; // FRAME_QUEUE_SIZE 最大size, 数字太大时会占用大量的…

第二证券:什么是a股b股?

在我国的股市中,我们经常会听到“A股”和“B股”这两个名词。那么,终究什么是A股和B股呢? 首先,A股全称为“A股票”,是指在我国境内上市的以人民币计价的股票。A股首要面向国内出资者,只要具有必定条件的内…

如何快速检测代理IP质量?方法与工具全干货

一直以来,IP代理都是出海跨境业务的刚需。质量好的IP代理,除了在跨境业务产生巨大作用,在SEO监控、爬虫抓取、市场研究等领域也发挥着很大的作用。但是,对于IP代理的质量检测是我们选择高标准IP代理的一句,我们一般都会…

美国零售电商平台Target,值得入驻吗?如何入驻?

Target 是美国最大的零售商之一,在品牌出海为大势所趋的背景下,它在北美电商中的地位节节攀升。Target 商店在众多垂直领域提供各种价格实惠的自有品牌,吸引越来越多的跨境商家入驻,如美妆、家居、鞋服、日用百货等,随…

Mybatis学习笔记12 分页插件

Mybatis学习笔记11 缓存相关_biubiubiu0706的博客-CSDN博客 (5) select distinct top(<取数说明>) <选择 列表> (1) from <表1> <连接类型> join <表2> ON <连接条件> (2) where <筛选条件> (3) group by <分组条件> (4) havi…

linux下链接

linux下链接用法 ln链接格式与介绍 linux下链接用法一、链接的使用格式二、链接的介绍 一、链接的使用格式 链接&#xff1a; 格式&#xff1a; ln 源文件 链接文件 硬链接 ln -s 源文件 链接文件 软连接 硬链接文件占磁盘空间 但是删除源文件不会影响硬链接文件 软链接文件不…

秒杀场景设计

1.活动页面静态化处理 没有到活动时间页面静态化处理避免访问服务端 2.使用cdn让用户可以获取就近的所需静态页面内容 3.限制用户同一时间点击次数 4.把商品库存提前放入redis&#xff0c;秒杀请求直接操作redis防止操作直接落库打崩数据库 5.使用lua脚本操作redis保证操作…

transformer系列2---transformer架构详细解析

transformer详细解析 Encoder1 输入1.1 Embedding 词嵌入1.1.1 Embedding 定义1.1.2 几种编码方式对比1.1.3 实现代码 1.2 位置编码1.2.1 使用位置编码原因1.2.2 位置编码方式1.2.3 位置编码代码 2 注意力 Attention2.1 自注意力self-attention2.1.1 QKV含义2.1.2 自注意力公式…

微信图文如何替换成自己的二维码?

二维码样式中的二维码目前都是小蚂蚁的二维码&#xff0c;如何替换成自己的二维码&#xff0c;其实也很简单&#xff0c;就像替换样式中的图片一样&#xff0c;首先点击二维码&#xff0c;选择工具条中的“换图”&#xff0c;然后在弹出来的框中填入二维码图片的链接地址或者直…

招股书更新9版终上市,飞沃科技能否躲过风电红利后的黯淡?

文丨熔财经 作者丨文泽 碳达峰、碳中和成为主旋律目标下&#xff0c;作为可再生能源主力的风电产业迎来了发展的“黄金时代”。与新能源相关的上下游企业也赚的“盆满钵满”。 在此背景下&#xff0c;飞沃科技(301232.SZ)历经4轮问询&#xff0c;更新9版招股书终于登陆资本市…

Command not found 解决方法

前言&#xff1a;要更新code上服务器用GUI失败&#xff0c;$ patch_delivery_gui&#xff0c;报错&#xff1a;patch_delivery_gui: command not found&#xff0c;上次编TA也是这个问题 写了个脚本&#xff1a;这个脚本会先检查ifconfig、firewall-cmd和vim命令是否可用&#…

ExcelServer EXCEL服务器使用- 用户、角色权限配置

Excel文件服务器搭建 搭建Excel服务器 1、登录 默认 用户名 Admin 密码 3 2、角色管理 添加修改角色 角色配置在 系统管理->角色.fexm文件夹下 可以像修改excel文件一样 修改角色 3、用户管理 添加修改用户 用户的修改在 系统管理->用户.fexm 可以像excel一样编辑用户…

27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…