基于Canal同步MySQL数据到Elasticsearch
基于 canal 同步 mysql 的数据到 elasticsearch 中。
1、canal-server
相关软件的安装请参考:《Canal实现数据同步》
1.1 pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>canal-to-elasticsearch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>canal-to-elasticsearch</name>
<description>canal to elasticsearch</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.2 SimpleCanalClientExample编写
package com.example.canatest.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
/**
* 说明:用于测试canal是否已经连接上了mysql
*/
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
注意当后面canal-adapter
也连接上canal-server
后,程序就监听不到数据变化了。
这个类只是测试,下面不使用。
2、canal-adapter
由于目前canal-adapter
没有官方docker镜像,所以拉去一个非官方的。
canal-adapter安装:
搜索镜像
$ docker search canal-adapter
拉取镜像
$ docker pull slpcat/canal-adapter:v1.1.5
启动
$ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5
修改配置
$ docker exec -it 89ef714d3a0e /bin/bash
$ cd conf/
$ vi application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
# canal.tcp.server.host需要修改
canal.tcp.server.host: 192.168.94.186:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
# url,username,password需要修改
url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=true
username: canal
password: canal
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# name需要修改
- name: es7
# hosts需要修改
hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest
# security.auth: test:123456 # only used for rest mode
# cluster.name需要修改
cluster.name: my-es
$ cd conf/es7
$ cp -v mytest_user.yml canal_test_collect.yml
# 删除其他多余的
$ rm -rf biz_order.yml customer.yml mytest_user.yml
$ vi dailyhub_collect.yml
dataSourceKey: defaultDS
# 需要修改
destination: example
# 需要修改
groupId: g1
esMapping:
# 需要修改
_index: canal_test
_id: _id
_type: _doc
upsert: true
# pk: id
# 需要修改
sql: "
SELECT
c.id AS _id,
c.user_id AS userId,
c.title AS title,
c.url AS url,
c.note AS note,
c.collected AS collected,
c.created AS created,
c.personal AS personal,
u.username AS username,
u.avatar AS userAvatar
FROM
m_collect c
LEFT JOIN m_user u ON c.user_id = u.id
"
# objFields:
# _labels: array:;
# etlCondition: "where c.c_time>={}"
commitBatch: 3000
也可以在外面编辑好,通过docker命令传输到docker容器中:
$ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
$ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml
重启容器
$ docker restart 89ef714d3a0e
验证是否启动成功
$ docker logs -f 89ef714d3a0e
注意对于时间类型,在后端一定要使用LocalDateTime
或者LocalDate
类型,如果是Date
类型,需要自己手动
设置格式。
3、测试
准备测试条件:
1、首先在数据库中生成表和字段
CREATE TABLE `m_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`avatar` varchar(255) DEFAULT NULL,
`created` date DEFAULT NULL,
`lasted` date DEFAULT NULL,
`open_id` varchar(255) DEFAULT NULL,
`statu` int(11) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `m_collect` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`collected` date DEFAULT NULL,
`created` date DEFAULT NULL,
`note` varchar(255) DEFAULT NULL,
`personal` int(11) DEFAULT NULL,
`title` varchar(255) DEFAULT NULL,
`url` varchar(255) DEFAULT NULL,
`user_id` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;
2、然后在elasticsearch中生成索引
# 创建索引并添加映射字段
PUT /canal_test
{
"mappings": {
"properties": {
"collected": {
"type": "date",
"format": "date_optional_time||epoch_millis"
},
"created": {
"type": "date",
"format": "date_optional_time||epoch_millis"
},
"note": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"personal": {
"type": "integer"
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"url": {
"type": "text"
},
"userAvatar": {
"type": "text"
},
"userId": {
"type": "long"
},
"username": {
"type": "keyword"
}
}
}
}
3、插入数据
INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');
4、查看数据
GET /canal_test/_search
5、遇到的问题
如果看到canal-adapter
一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysql
、es
、canal
、
adapar
:
2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!