勿以恶小而为之,勿以善小而不为----- 刘备
SpringBoot 整合 Canal
pom.xml 添加 canal.client 依赖
(1.1.5 改动很大,这儿客户端用 1.1.4)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>top.yueshushu</groupId>
<artifactId>learn</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Canal</name>
<description>学习 Canal</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 导入配置文件处理器,配置文件进行绑定就会有提示,需要重启 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!--导入自动热步署的依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!--引入MySql的驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--引入springboot与mybatis整合的依赖-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<!-- 引入pagehelper分页插件 -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.5</version>
</dependency>
<!--添加 druid-spring-boot-starter的依赖的依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.14</version>
</dependency>
<!--SpringBoot 的aop 模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!--添加canal的依赖. 重要. 使用 1.1.4-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.4</version>
</dependency>
</dependencies>
<build>
<!--将该目录下的文件全部打包成类的路径-->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
业务功能处理
简单连接程序
/**
* 一个简单的canal 的连接测试程序
*/
@Test
public void connectionTest() {
//1. 创建连接 填充对应的地址信息 ,要监控的实例和相应的用户名和密码
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal",
"canal"
);
//2. 进行连接
canalConnector.connect();
log.info(">>>连接成功:{}", canalConnector);
}
17:26:32.179 [main] INFO top.yueshushu.learn.CanalDemoTest - >>>连接成功:com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3
单次获取数据
/**
* 获取数据信息. 可以发现,未获取到数据 . 这个应该是实时的.
*/
@Test
public void getDataTest() {
//1. 创建连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"canal",
"canal"
);
// 进行连接
canalConnector.connect();
//3. 注册,看使用哪个数据库表
canalConnector.subscribe("springboot.user");
//4. 获取 1条数据
Message message = canalConnector.get(1);
log.info("获取的数据:id:{},数据:{}", message.getId(), message);
if (message.getId() == -1) {
log.info(">>>未获取到数据");
return;
}
//5. 获取相应的数据集合
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
log.info(">>>获取数据 {}", entry);
//获取表名
CanalEntry.Header header = entry.getHeader();
log.info(">>>获取表名:{}", header.getTableName());
CanalEntry.EntryType entryType = entry.getEntryType();
log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
//获取数据
ByteString storeValue = entry.getStoreValue();
log.info(">>>输出存储的值:{}", storeValue);
}
}
在主库里面插入一条数据
insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用户',24,'男','学习canal');
再次执行:
循环获取数据
/**
* 获取数据信息. 获取现在的数据. 再次执行时,就没有这个数据了.
*/
@Test
public void getNowDataTest() {
//1. 创建连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"canal",
"canal"
);
// 进行连接
canalConnector.connect();
//3. 注册,看使用哪个数据库表
canalConnector.subscribe("springboot.user");
for (;;) {
//4. 获取 1条数据
Message message = canalConnector.get(1);
log.info("获取的数据:id:{},数据:{}", message.getId(), message);
if (message.getId() == -1) {
log.info(">>>未获取到数据");
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
//5. 获取相应的数据集合
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
log.info(">>>获取数据 {}", entry);
//获取表名
CanalEntry.Header header = entry.getHeader();
log.info(">>>获取表名:{}", header.getTableName());
CanalEntry.EntryType entryType = entry.getEntryType();
log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
//获取数据
ByteString storeValue = entry.getStoreValue();
log.info(">>>输出存储的值:{}", storeValue);
}
}
}
可以随时获取相应的数据变更信息。
会发现, storeValue 的值是很难解读的。 需要将这个数据解析出来。
解析 storeValue 值
/**
* 将 storeValue 进行解析,解析成我们能看懂的语句.
* 对数据库 cud 进行处理操作观看一下.
* 发现,点是不好的,也有多余的记录信息.
*
* @throws Exception 异常
*/
@Test
public void convertDataTest() throws Exception {
//1. 创建连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"canal", "canal"
);
//2. 进行连接
canalConnector.connect();
canalConnector.subscribe("springboot.user");
for (;;) {
//获取信息
Message message = canalConnector.get(1);
if (message.getId() == -1L) {
// log.info("未获取到数据");
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
List<CanalEntry.Entry> entryList = message.getEntries();
//对获取到的数据进行处理
log.info(">>获取到{}条数据", entryList.size());
for (CanalEntry.Entry entry : entryList) {
CanalEntry.Header header = entry.getHeader();
log.info(">>>获取表名:{}", header.getTableName());
//获取类型.
CanalEntry.EntryType entryType = entry.getEntryType();
log.info(">>类型编号 {},类型名称:{}", entryType.getNumber(), entryType.name());
//获取存入日志的值
ByteString storeValue = entry.getStoreValue();
//将这个值进行解析
CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue);
String sql = rowChange.getSql();
log.info(">>>获取对应的sql:{}", sql);
// 这个sql 可能是 批量的sql语句
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
log.info(">>>获取信息:{}", rowData);
//对数据进行处理
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
beforeColumnsList.forEach(
n -> log.info("哪个列{},原先是{},是否被更新{}", n.getName(),
n.getValue(), n.getUpdated())
);
afterColumnsList.forEach(
n -> log.info("哪个列{},后来是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated())
);
}
}
}
}
再次执行sql
insert into springboot.user(id,name,age,sex,description)
values(2,'canal添加用户2',25,'男','学习canal2');
不同的类型,进行不同的处理
发现 其他类型的 如: TRANSACTIONBEGIN 也进行了处理
/**
* 类型转换数据
*
* @throws Exception 异常
*/
@Test
public void dataTypeTest() throws Exception {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal", "canal"
);
canalConnector.connect();
canalConnector.subscribe("springboot.user");
for(;;){
Message message = canalConnector.get(1);
if (message.getId() == -1) {
TimeUnit.SECONDS.sleep(1);
continue;
}
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
CanalEntry.EntryType entryType = entry.getEntryType();
//只要 RowData 数据类型的
if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
continue;
}
String tableName = entry.getHeader().getTableName();
log.info(">>>对表 {} 进行操作", tableName);
ByteString storeValue = entry.getStoreValue();
RowChange rowChange = RowChange.parseFrom(storeValue);
//行改变
CanalEntry.EventType eventType = rowChange.getEventType();
switch (eventType) {
case INSERT: {
insertHandler(rowChange);
break;
}
case UPDATE: {
updateHandler(rowChange);
break;
}
case DELETE: {
deleteHandler(rowChange);
break;
}
default: {
break;
}
}
}
}
}
private void deleteHandler(RowChange rowChange) {
log.info(">>>>执行删除的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
}
}
}
private void updateHandler(RowChange rowChange) {
log.info(">>>执行更新的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
Map<String, String> afterValueMap = afterColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
beforeValueMap.forEach((column, beforeValue) -> {
String afterValue = afterValueMap.get(column);
Boolean update = beforeValue.equals(afterValue);
log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
update);
});
}
}
/**
* 插入数据. 只有后的数据.
*
* @param rowChange 行改变
*/
private void insertHandler(RowChange rowChange) {
log.info(">>>执行添加 的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
if (!StringUtils.hasText(column.getValue())) {
continue;
}
log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
}
}
}
插入,更新,删除,分别进行了处理.
先启动测试程序:
不打印任何信息。
主表执行添加语句:
insert into springboot.user(id,name,age,sex,description)
values(4,'canal添加用户4',25,'男','学习canal4');
会打印信息:
这个可读性就非常高了.
主表执行修改的操作.
update springboot.user set name='开开心心',age=26,description='岳泽霖' where id =4;
更新时,若每一个字段都跟原先一样,不会产生日志消费。
主表执行删除的操作:
delete from springboot.user where id =4;
上面的获取,都是一条数据一条数据获取的。效率比较低
一次性获取多条数据
/**
* 一次性获取多条数据。
* sql 执行多条。
*/
@Test
public void dataMoreTest() throws Exception {
//1. 创建 canal连接对象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal",
"canal"
);
canalConnector.connect();
// 订阅哪个对象
canalConnector.subscribe("springboot.user");
for (; ; ) {
// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
Message message = canalConnector.get(3);
if (message.getId() == -1) {
// 未获取到数据
continue;
}
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
CanalEntry.EntryType entryType = entry.getEntryType();
if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
continue;
}
String tableName = entry.getHeader().getTableName();
log.info(">>>>对表{} 执行操作", tableName);
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
//对类型进行处理
CanalEntry.EventType eventType = rowChange.getEventType();
switch (eventType) {
case INSERT: {
insertHandler(rowChange);
break;
}
case UPDATE: {
updateHandler(rowChange);
break;
}
case DELETE: {
deleteHandler(rowChange);
break;
}
default: {
break;
}
}
}
}
}
private void deleteHandler(CanalEntry.RowChange rowChange) {
log.info(">>>>执行删除的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
}
}
}
private void updateHandler(CanalEntry.RowChange rowChange) {
log.info(">>>执行更新的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
Map<String, String> afterValueMap = afterColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
beforeValueMap.forEach((column, beforeValue) -> {
String afterValue = afterValueMap.get(column);
Boolean update = beforeValue.equals(afterValue);
log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
update);
});
}
}
/**
* 插入数据. 只有后的数据.
*
* @param rowChange 行改变
*/
private void insertHandler(CanalEntry.RowChange rowChange) {
log.info(">>>执行添加 的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
if (!StringUtils.hasText(column.getValue())) {
continue;
}
log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
}
}
}
修改点:
// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
Message message = canalConnector.get(3);
.get(3) 表示 一次性获取3条记录.
canalConnector.get(3, 5L, TimeUnit.SECONDS); 表示5秒之内获取3条记录,
有两个触发条件,一个是获取了3条,一个是到了5秒。
效果展示信息与之前是一致的,就不重新演示了。
ack 配置信息
/**
* 一次性获取多条数据。
* sql 执行多条。
*/
@Test
public void dataMoreTest() throws Exception {
//1. 创建 canal连接对象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal",
"canal"
);
canalConnector.connect();
// 订阅哪个对象
canalConnector.subscribe("springboot.user");
for (; ; ) {
Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
if (message.getId() == -1) {
// 未获取到数据
TimeUnit.MILLISECONDS.sleep(500);
continue;
}
log.info(">>>>获取对应的 id: {}",message.getId());
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
CanalEntry.EntryType entryType = entry.getEntryType();
if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
continue;
}
String tableName = entry.getHeader().getTableName();
log.info(">>>>对表{} 执行操作", tableName);
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
//对类型进行处理
CanalEntry.EventType eventType = rowChange.getEventType();
switch (eventType) {
case INSERT: {
insertHandler(rowChange);
break;
}
case UPDATE: {
updateHandler(rowChange);
break;
}
case DELETE: {
deleteHandler(rowChange);
break;
}
default: {
break;
}
}
}
//进行回滚
// canalConnector.rollback();
//确认ack 配置
canalConnector.ack(message.getId());
}
}
private void deleteHandler(CanalEntry.RowChange rowChange) {
log.info(">>>>执行删除的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
}
}
}
private void updateHandler(CanalEntry.RowChange rowChange) {
log.info(">>>执行更新的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
Map<String, String> afterValueMap = afterColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
beforeValueMap.forEach((column, beforeValue) -> {
String afterValue = afterValueMap.get(column);
Boolean update = beforeValue.equals(afterValue);
log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
update);
});
}
}
/**
* 插入数据. 只有后的数据.
*
* @param rowChange 行改变
*/
private void insertHandler(CanalEntry.RowChange rowChange) {
log.info(">>>执行添加 的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
if (!StringUtils.hasText(column.getValue())) {
continue;
}
log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
}
}
}
主要信息:
Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
//进行回滚 // canalConnector.rollback();
//确认ack 配置canalConnector.ack(message.getId());
手动确认消息消费了.
当消息 rollback() 回滚后,会再次消费这条消息.
canalConnector.rollback();
执行语句:
insert into springboot.user(id,name,age,sex,description)
values(5,'canal添加用户5',25,'男','学习canal5');
如果变成 手动确认,
canalConnector.ack(message.getId());
则只消费一次.