canal
是什么
- canal [kə’næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;
- 历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;
- Canal是基于MySQL变更日志增量订阅和消费的组件
能干嘛
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
去哪下
- 下载canal
- java案例
工作原理
传统MySQL主从复制工作原理
MySQL的主从复制将经过如下步骤:
- 当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
- salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
- 同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
- slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
- salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
- 最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;
canal工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )canal 解析 binary log 对象(原始为 byte 流)
- 分布式系统只有最终一致性,很难做到强一致性
mysql-canal-redis双写一致性Coding
mysql
CREATE TABLE `t_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`userName` varchar(100) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
- 开启 MySQL的binlog写入功能
- 授权canal连接MySQL账号
- mysql默认的用户在mysql库的user表里
- 默认没有canal账户,此处新建+授权
canal服务端
- 下载
- 解压后整体放入/mycanal路径下
- 配置修改
-
/mycanal/canal.deployer-1.1.5/conf/example路径下
-
instance.properties
-
换成自己的在mysql新建的canal账户
-
- 启动
- /mycanal/canal.deployer-1.1.5/bin路径下执行
- ./startup.sh
- 查看 server 日志
- 查看 instance 的日志
canal客户端(Java编写业务程序)
-
建module
-
改pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.zzyy.study</groupId> <artifactId>canal_demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>canal_demo</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <junit.version>4.12</junit.version> <log4j.version>1.2.17</log4j.version> <lombok.version>1.16.18</lombok.version> <mysql.version>5.1.47</mysql.version> <druid.version>1.1.16</druid.version> <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version> </properties> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> <!--guava--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> <!--web+actuator--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--SpringBoot与Redis整合依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <!-- jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <!-- springboot-aop 技术--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!-- redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.4</version> </dependency> <!--Mysql数据库驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!--集成druid连接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <!--mybatis和springboot整合--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis.spring.boot.version}</version> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--通用基础配置--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
写YML
server.port=5555
-
主启动
-
业务类
- RedisUtils
package com.zzyy.study.util; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * @auther zzyy * @create 2020-10-11 14:33 */ public class RedisUtils { public static JedisPool jedisPool; static { JedisPoolConfig jedisPoolConfig=new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(20); jedisPoolConfig.setMaxIdle(10); jedisPool=new JedisPool(jedisPoolConfig,"192.168.111.147",6379); } public static Jedis getJedis() throws Exception { if(null!=jedisPool){ return jedisPool.getResource(); } throw new Exception("Jedispool is not ok"); } /*public static void main(String[] args) throws Exception { try(Jedis jedis = RedisUtils.getJedis()) { System.out.println(jedis); jedis.set("k1","xxx2"); String result = jedis.get("k1"); System.out.println("-----result: "+result); System.out.println(RedisUtils.jedisPool.getNumActive());//1 }catch (Exception e){ e.printStackTrace(); } }*/ }
- RedisCanalClientExample
package com.zzyy.study.t1; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.zzyy.study.util.RedisUtils; import org.springframework.beans.factory.annotation.Autowired; import redis.clients.jedis.Jedis; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.TimeUnit; /** * @auther zzyy * @create 2020-11-11 17:13 */ public class RedisCanalClientExample { public static final Integer _60SECONDS = 60; public static void main(String args[]) { // 创建链接canal服务端 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.111.147", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); //connector.subscribe(".*\\..*"); connector.subscribe("db2020.t_user"); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else {//EventType.UPDATE redisUpdate(rowData.getAfterColumnsList()); } } } } private static void redisInsert(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(),jsonObject.toJSONString()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisDelete(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.del(columns.get(0).getValue()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisUpdate(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(),jsonObject.toJSONString()); System.out.println("---------update after: "+jedis.get(columns.get(0).getValue())); }catch (Exception e){ e.printStackTrace(); } } } }
- RedisUtils