基于Canal同步MySQL数据到Elasticsearch

news2024/11/17 4:34:12

基于Canal同步MySQL数据到Elasticsearch

基于 canal 同步 mysql 的数据到 elasticsearch 中。

1、canal-server

相关软件的安装请参考:《Canal实现数据同步》

1.1 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>canal-to-elasticsearch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>canal-to-elasticsearch</name>
    <description>canal to elasticsearch</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2 SimpleCanalClientExample编写

package com.example.canatest.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 说明:用于测试canal是否已经连接上了mysql
 */
public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

在这里插入图片描述

在这里插入图片描述

注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

这个类只是测试,下面不使用。

2、canal-adapter

由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的。

canal-adapter安装:

搜索镜像

$ docker search canal-adapter

在这里插入图片描述

拉取镜像

$ docker pull slpcat/canal-adapter:v1.1.5

在这里插入图片描述

启动

$ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5

在这里插入图片描述

修改配置

$ docker exec -it 89ef714d3a0e /bin/bash
$ cd conf/
$ vi application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    # canal.tcp.server.host需要修改
    canal.tcp.server.host: 192.168.94.186:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      # url,username,password需要修改
      url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      # name需要修改
      - name: es7
        # hosts需要修改
        hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          # cluster.name需要修改
          cluster.name: my-es
$ cd conf/es7
$ cp -v mytest_user.yml canal_test_collect.yml
# 删除其他多余的
$ rm -rf biz_order.yml customer.yml mytest_user.yml
$ vi dailyhub_collect.yml
dataSourceKey: defaultDS
# 需要修改
destination: example
# 需要修改
groupId: g1
esMapping:
  # 需要修改
  _index: canal_test
  _id: _id
  _type: _doc
  upsert: true
#  pk: id
  # 需要修改
  sql: "
SELECT
        c.id AS _id,
        c.user_id AS userId,
        c.title AS title,
        c.url AS url,
        c.note AS note,
        c.collected AS collected,
        c.created AS created,
        c.personal AS personal,
        u.username AS username,
        u.avatar AS userAvatar
FROM
        m_collect c
LEFT JOIN m_user u ON c.user_id = u.id

"
#  objFields:
#    _labels: array:;
#   etlCondition: "where c.c_time>={}"
  commitBatch: 3000

也可以在外面编辑好,通过docker命令传输到docker容器中:

$ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
$ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml

重启容器

$ docker restart 89ef714d3a0e

验证是否启动成功

$ docker logs -f 89ef714d3a0e

在这里插入图片描述

注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动

设置格式。

3、测试

准备测试条件:

1、首先在数据库中生成表和字段

CREATE TABLE `m_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `avatar` varchar(255) DEFAULT NULL,
  `created` date DEFAULT NULL,
  `lasted` date DEFAULT NULL,
  `open_id` varchar(255) DEFAULT NULL,
  `statu` int(11) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

CREATE TABLE `m_collect` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `collected` date DEFAULT NULL,
  `created` date DEFAULT NULL,
  `note` varchar(255) DEFAULT NULL,
  `personal` int(11) DEFAULT NULL,
  `title` varchar(255) DEFAULT NULL,
  `url` varchar(255) DEFAULT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
  CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

在这里插入图片描述

2、然后在elasticsearch中生成索引

# 创建索引并添加映射字段
PUT /canal_test
{
  "mappings": {
    "properties": {
      "collected": {
        "type": "date",
        "format": "date_optional_time||epoch_millis"
      },
      "created": {
        "type": "date",
        "format": "date_optional_time||epoch_millis"
      },
      "note": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "personal": {
        "type": "integer"
      },
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "url": {
        "type": "text"
      },
      "userAvatar": {
        "type": "text"
      },
      "userId": {
        "type": "long"
      },
      "username": {
        "type": "keyword"
      }
    }
  }
}

在这里插入图片描述

3、插入数据

INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');

在这里插入图片描述

4、查看数据

GET /canal_test/_search

5、遇到的问题

如果看到canal-adapter一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysqlescanal

adapar

2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1142637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中国地名信息库

地名是社会基本公共信息&#xff0c;是历史文化的重要载体。 2014年至2018年&#xff0c;国家启动实施并完成了第二次全国地名普查工作&#xff0c;全国共计采集地名1320多万条&#xff0c;修测标绘地名图2.4万多幅&#xff0c;新设更新地名标志68万多块&#xff0c;普遍建立了…

server2012 通过防火墙开启局域网内限定IP进行远程桌面连接

我这里需要被远程桌面的电脑系统版本为windows server2012 1、打开允许远程连接设置 2、开启防火墙 3、设置允许“远程桌面应用”通过防火墙 勾选”远程桌面“ 3、入站规则设置 高级设置→入站规则→远程桌面-用户模式(TCP-In) 进入远程桌面属性的作用域——>远程IP地址—…

演讲比赛常见误区及解决方法

演讲比赛常见误区及解决方法 一、演讲内容选择错误 1. 主题选择不合理 许多参赛者选择的主题内容&#xff0c;与比赛题目要求或听众背景不符&#xff0c;难以引起听众的兴趣。正确选择主题应考虑以下几点&#xff1a; - 主题应与比赛题目要求相符合&#xff0c;切合比赛定位…

《C和指针》(5)操作符和表达式

问题 下面这个表达式的类型和值分别是什么? 答&#xff1a;该值为2.0&#xff0c;如果要进行浮点除法&#xff0c;请使用以下表达式 下面这个程序的结果是什么&#xff1f; 答&#xff1a;这是一个狡猾的问题。比较明显的回答是-10(2-3 *4),但实际上它因编译器而异。乘法运…

Android S从桌面点击图标启动APP流程 (五)

系列文章 Android S从桌面点击图标启动APP流程 (一)Android S从桌面点击图标启动APP流程 (二) Android S从桌面点击图标启动APP流程 (三) Android S从桌面点击图标启动APP流程 (四) Android S从桌面点击图标启动APP流程 (五) Android S从桌面点击图标启动APP流程 (六) An…

17、简单记录一下两个流媒体工具和推流测试

基本思想:在开发流媒体服务过程中,使用了两个流媒体工具,这里做一下简单的记录,以后可以翻阅和查看 一:流媒体服务工具之一:https://github.com/bluenviron/mediamtx/releases 它支持rtsp/rtmp/hls推流测试 二、流媒体工具:Releases EasyDarwin/EasyDarwin GitHub 该…

华为认证H12-831考试新增题库

279、 以下哪些数列能被正则表达式[^100|200]$匹配? A、300 200 100 B、200 100 300 C、100 200 300 D、100 300 200 试题答案&#xff1a;BC 试题解析&#xff1a;[^ ]表示不包括字符&#xff0c;$表示以某字符结尾。题目的正则表达式表示不以100 200 结…

【CMake】windows10下入门课程

【CMake】windows10下入门课程 提示:博主取舍了很多大佬的博文并亲测有效,分享笔记邀大家共同学习讨论 文章目录 【CMake】windows10下入门课程前言cmake安装初识cmake&#xff1a;新建helloworld项目cmake的入门使用法则总结 前言 CMake(Cross-Platform Make)是一个开源的跨平…

高效文件管理:自动生成文件夹及重命名的方法分享

在高效文件管理中&#xff0c;自动生成文件夹及重命名是一项非常实用的技巧。通过掌握这种方法&#xff0c;我们能够更轻松地整理和查找文件&#xff0c;提高工作效率。本文将分享云炫文件管理器自动生成文件夹及重命名的实用方法&#xff0c;帮助您实现高效的文件管理。现在跟…

OSPF NSSA区域配置

NSSA&#xff1a;Not-So-Stubby Area&#xff08;不太末节的区域&#xff09; 示例&#xff0c;拓朴如下&#xff1a; 思路&#xff1a; R1正常配置Area 0区域&#xff0c;R2的1口配置为区域0&#xff0c;2口配置为区域1&#xff0c;配置NSSA&#xff0c;R3配置为区域1…

如何在Windows和Linux系统上监听文件夹的变动?

文章目录 如何在Windows和Linux系统上监听文件夹的变动&#xff1f;读写文件文件系统的操作缓冲和流文件改变事件 如何在Windows和Linux系统上监听文件夹的变动&#xff1f; libuv库实现了监听整个文件夹的修改。本文详细介绍libuv库文件读写和监听的的实现方法。libuv库开发了…

【计算机网络笔记】Cookie技术

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

Linux系统下安全控制策略SELinux解析

SELinux&#xff08;Security-Enhanced linux&#xff09;是增强版Linux&#xff0c;简称SELinux&#xff0c;它是一个Linux内核模块&#xff0c;也是Linux的一个安全子系统&#xff0c;主要以内核模块为支持&#xff0c;用户态程序进行权限策略管理。 背景 Linux系统下的roo…

镍氢充电管理芯片-IC AH2185

镍氢充电管理芯片AH2185&#xff1a;为便携式设备提供高效充电解决方案 随着科技的不断发展&#xff0c;便携式设备在人们的生活中扮演着越来越重要的角色。这些设备包括数码相机、电子词典、智能手机等&#xff0c;它们共同的特点是需要定期充电。为了满足这一需求&#xff0…

FFmpeg5.1.3编译动态库踩坑之旅(基于Linux虚拟机)

准备工作 环境准备 1.Windows安装Oracle VM VirtualBox 7.0.10&#xff0c;安装ubuntu-22.04.3。 坑一&#xff1a;无法往虚拟机里拖放复制文件&#xff0c;解决办法&#xff1a;登录Ubuntu虚拟机时切换到xorg方式登录&#xff0c;参考地址&#xff1a;Ubuntu Desktop 22.04…

LLVM学习笔记(55)

4.1.3. 降级 在前面的章节里&#xff0c;我们展示了目标机器特定节点与目标机器无关节点共存的一个图。你可能会问&#xff0c;如果这是指令选择的一个输入&#xff0c;为什么在SelectionDAG类中已经有一些目标机器特定的节点&#xff1f;要理解这&#xff0c;我们首先在下图概…

Kafka - 3.x 图解Broker总体工作流程

文章目录 Zk中存储的kafka的信息Kafka Broker总体工作流程1. broker启动后向zk中注册2. Controller谁先启动注册&#xff0c;谁说了算3. 由选举出来的Controller监听brokers节点的变化4. Controller决定leader选举5. Controller将节点信息上传到Zk中6. 其他Controller从zk中同步…

numpy和字符串格式化,用*画田字形状

numpy的字符型元素矩阵&#xff0c;可以方便画&#xff1b;直接python字符串手撕&#xff0c;也可以轻巧完成。 (本笔记适合熟悉循环和列表的 coder 翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《…

鸿蒙ArkUI-X跨端应用开发,一套代码构建多平台应用

文章目录 一、项目介绍二、技术架构三、Gitee仓库地址四、ArkUI-X开发者文档五、快速开始——环境准备1、下载DevEco Studio&#xff0c;版本V4.0 Beta2以上2、打开DevEco&#xff0c;下载相关环境配置3、配置开发环境3.1、OpenHarmony SDK3.2、安装ArkUI-X SDK3.2、Android SD…

科聪协作(复合)移动机器人整体解决方案

协作&#xff08;复合&#xff09;移动机器人&#xff08;AGV/AMR&#xff09;相较传统工业机器人具有更加安全和简单的工作优势&#xff0c;具备较强的发展潜力。协作&#xff08;复合&#xff09;移动机器人安全性和操作的简洁性、灵活性不断提高,优势得到了充分发挥,在越来越…