canal 数据异构组件
为啥要使用这个组件?
在更新DB的时候不同步更新到redis,es等数据库中,时间太久,而且可能会存在同步失败的问题,因此引入canal去拉取DB的数据,再去更新到redis,es等数据库中,有失败重试和回滚等功能。
canal原理?
canal 伪装成salve向mysql发送dump协议,拿到备份数据binlog,去更新数据到redis,es等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据
canal 组件的使用
1.下载canal组件
下载地址canal组件下载地址
在我的资源中也有canal组件包
解压启动(我是windows版,双击startup.bat)
2.数据库配置
1.开启MySQL , 需要先开启 Binlog 写入功能
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2.授权 canal 作为mysql 的slave 的权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.项目引入jar包
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
4.写canal监听数据工具类
package com.next.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
5.简单例子使用测试
1.数据库更改user_id从0改为1,再从1改为0
2.查看canal监测的数据(canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据)
6.进一步完善canal监听数据工具类,用于应用例子
1.加入监听器,项目启动时启动
2.使用线程去监听数据
3.替换掉system.out.print(),里面有锁,会阻塞,使用日志打印
4.处理canal监测到的数据
package com.next.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @desc 不要用system.out.print()里面有锁,会阻塞,用日志打印
*/
@Service
@Slf4j
public class CanalSubscribe implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private TrainSeatService trainSeatService;
@Resource
private TrainNumberService trainNumberService;
//监听,启动的时候就开始调用此监听方法
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
canalSubscribe();
}
private void canalSubscribe() {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
//使用线程
new Thread(() -> {
try {
log.info("canal subscribe");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
//没有取到数据继续
safeSleep(100);
continue;
}
try {
log.info("new message,batchIds:{},size:{}", batchId, batchSize);
//打印日志
printEntry(message.getEntries());
// 提交确认
connector.ack(batchId);
} catch (Exception e2) {
log.error("canal data exception,batchIds:{}", batchId, e2);
// 处理失败, 回滚数据
connector.rollback(batchId);
}
}
} catch (Exception e3) {
log.error("canal subscribe exception", e3);
safeSleep(1000);
canalSubscribe();
}
}).start();
}
private void printEntry(List<CanalEntry.Entry> entrys) throws Exception{
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("RowChange.parse Exception , data:" + entry, e);
}
//更新类型-更新,删除,新增
CanalEntry.EventType eventType = rowChage.getEventType();
//数据库名
String schemaName = entry.getHeader().getSchemaName();
//表名
String tableName = entry.getHeader().getTableName();
log.info("name:[{},{}],eventType:{}",schemaName,tableName,eventType);
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);
} else {
handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);
}
}
}
}
//处理canal监测到的数据
private void handleColumn(List<CanalEntry.Column> columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{
if(schemaName.contains("12306_seat_")){
//处理座位变更
trainSeatService.handle(columnsList,eventType);
}else if(tableName.equals("train_number")){
//车次详情处理(实际上是车次信息变更之后才批量处理车次详情)
trainNumberService.handle(columnsList,eventType);
}else{
log.info("drop data,no need care");
}
}
private void safeSleep(int millis) {
try {
Thread.sleep(100);
} catch (Exception e1) {
}
}
}
处理canal监测到的数据(拿到改变的数据,放到实体类中,存到redis中)
package com.next.service;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
@Service
@Slf4j
public class TrainSeatService {
@Resource
private TrainNumberMapper trainNumberMapper;
@Resource
private TrainCacheService trainCacheService;
//处理座位,canal通过监听座位库,拿到改变的数据,放到实体类中
public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {
if (eventType != CanalEntry.EventType.UPDATE) {
log.info("not update,no need care");
return;
}
TrainSeat trainSeat = new TrainSeat();
boolean isStatusUpdated = false;
for (CanalEntry.Column column : columns) {
//票的状态改变了才做下面的操作
if (column.getName().equals("status")) {
trainSeat.setStatus(Integer.parseInt(column.getValue()));
if (column.getUpdated()) {
isStatusUpdated = true;
} else {
break;
}
} else if (column.getName().equals("id")) {
trainSeat.setId(Long.parseLong(column.getValue()));
} else if (column.getName().equals("carriage_number")) {
trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));
} else if (column.getName().equals("row_number")) {
trainSeat.setRowNumber(Integer.parseInt(column.getValue()));
} else if (column.getName().equals("seat_number")) {
trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));
} else if (column.getName().equals("train_number_id")) {
trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));
} else if (column.getName().equals("ticket")) {
trainSeat.setTicket(column.getValue());
} else if (column.getName().equals("from_station_id")) {
trainSeat.setFromStationId(Integer.parseInt(column.getValue()));
} else if (column.getName().equals("to_station_id")) {
trainSeat.setToStationId(Integer.parseInt(column.getValue()));
}
}
if (!isStatusUpdated) {
log.info("status not update,no need care");
}
log.info("train seat update,trainSeat:{}", trainSeat);
/**
* 数据存到redis
* 1.指定座位被占:hash
* cacheKey:车次_日期 D386_20231001
* field: carriage_row_seat_fromStationId_toStationId
* value: 0-空闲 1-占座
*
* 2.每个座位详情剩余的座位数
* cacheKey: 车次_日期_count D386_20231001_count
* field: fromStationId_toStationId
* value: 实际座位数
*
*/
TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());
//放票
if (trainSeat.getStatus() == 1) {
trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),
trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()
+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
"0");
trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",
trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
1l);
log.info("seat+1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);
//占票
} else if (trainSeat.getStatus() == 2) {
trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),
trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()
+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
"1");
trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",
trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
-1l);
log.info("seat-1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);
} else {
log.info("status update not 1 or 2,no need care");
}
}
}
参考文档:canal使用说明文档