基于Canal的数据同步
一、 系统结构
该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。
- canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
- CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目
二、. Canal配置
在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。
1) 配置文件路径:canal/conf/canal.properties
2) 配置文件路径:
canal/conf/example/instance.properties
三、 Spring Boot配置
1. 项目结构
2. Canal账号密码配置
进入到Config下的CanalClient类文件。
注意密码是通过MD5加密的,图中这这段字符应替换为canal。
3. 目标数据库配置
在yml中配置数据同步目标数据库即可
源代码:
package com.canal.canalclient.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @Auther: fzl
* @Date: 2020/4/20 01:21
* @Description:
*/
public class CanalClient {
private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
public static void startCanal() {
//获取canalServer连接:本机地址,端口号
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址"
, "端口号"), "example", "canal", "canal");
int batchSize = 1000;
try {
//连接canalServer
connector.connect();
//订阅Desctinstion
connector.subscribe();
connector.rollback();
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录,有多少取多少
//轮询拉取数据 上面的where
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
//睡眠
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
System.out.println("aa"+size);
//当队列里面堆积的sql大于一定数值的时候就模拟执行
if (SQL_QUEUE.size() >= 10) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
public static JdbcTemplate jdbcTemplate;
/**
* 模拟执行队列里面的sql语句
*/
public static void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
jdbcTemplate.execute(sql);
System.out.println("[sql]----> " + sql);
}
}
/**
* 数据处理
*
* @param entrys
*/
private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == CanalEntry.EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/**
* 保存更新语句
*
* @param entry
*/
private static void saveUpdateSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存删除语句
*
* @param entry
*/
private static void saveDeleteSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存插入语句
*
* @param entry
*/
private static void saveInsertSql(CanalEntry.Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
package com.canal.canalclient;
import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {
@Autowired
@Qualifier("test_master_energy") //有多个数据源的,需要名称区分
private static JdbcTemplate jdbcTemplate;
@Override
public void run(ApplicationArguments args) throws Exception{
log.info("开始监听同步数据库");
CanalClient.jdbcTemplate = jdbcTemplate;
CanalClient.startCanal();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.canal</groupId>
<artifactId>CanalClient</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>CanalClient</name>
<description>CanalClient</description>
<properties>
<java.version>19</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.32</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包时跳过测试 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
</build>
</project>