pre 开启 mysql Binlog
网上有众多方法,自行百度。
查询是否成功,在 mysql 客户端输入
show BINARY LOGS;
出现如下提示,即表示 big log 正常开启。
1,下载 canal 服务端
传送门
注意:下载 canal.deployer-xxx 版本即可。admin 是 deployer 的管理端。
2,上传到服务器的指定位置并解压
tar xzvf canal.deployer-1.1.6.tar.gz
注意,这个 deployer 解压之后直接是零散文件夹,建议先创建一个文件夹后,在这个文件夹里面进行解压
3,配置实例
进入 conf 文件夹后,创建实例文件夹
cd conf/
mkdir test
从 example 文件夹中,拷贝instance.properties到当前文件夹
cp ../example/instance.properties .
4,编辑实例文件
4.1 源数据库位置
//源数据位置
canal.instance.master.address=127.0.0.1:3306
//源数据 binlog 名字
canal.instance.master.journal.name=
//源数据 biglog 偏移量
canal.instance.master.position=
4.2 连接源数据库的用户名和密码
//连接源数据库用户名
canal.instance.dbUsername=canal
//连接源数据库密码
canal.instance.dbPassword=canal
4.3 编辑完,保存退出
5,编辑 canal 的配置文件
cd ..
vim canal.properties
5.1 加入新加的实例,已逗号分割
canal.destinations = example
6,部署客户端
这里客户端可以根据 canal 的 api 文档自行开发。
这里贴一些关键代码
{
protected final static Logger logger = LoggerFactory.getLogger(CanalClientApplication.class);
private static String ADDRESS = ConfigUtils.getConfigValue("application.properties", "canal.address");
private static int PORT = Integer.parseInt(ConfigUtils.getConfigValue("application.properties", "canal.port"));
private static String DESTINATION = ConfigUtils.getConfigValue("application.properties", "canal.destination");
private static String USERNAME = ConfigUtils.getConfigValue("application.properties", "canal.username");
private static String PASSWORD = ConfigUtils.getConfigValue("application.properties", "canal.password");
private static String SUBSCRIBER = ConfigUtils.getConfigValue("application.properties", "canal.subscriber");
public static void main(String args[]) {
SpringApplication.run(CanalClientApplication.class,args);
System.out.println("数据同步服务启动成功");
// 创建链接
logger.info("Trying to connect to " + ADDRESS + ":" + PORT);
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ADDRESS,
PORT), DESTINATION, USERNAME, PASSWORD);
int batchSize = 1000;
try {
logger.info("...");
connector.connect();
logger.info("connected");
connector.subscribe(SUBSCRIBER);
connector.rollback();
logger.info("CanalClient Application started successfully!");
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
logger.info("当前 message 信息为:{}",message);
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
DataProcessor.process(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} catch (Exception e) {
e.printStackTrace();
logger.error("Canal Client exit with error.", e);
System.exit(-2);
} finally {
connector.disconnect();
}
}
}
{
protected final static Logger logger = LoggerFactory.getLogger(DataProcessor.class);
private static String DATABASE = ConfigUtils.getConfigValue("application.properties", "canal.database");
private static String TABLE = ConfigUtils.getConfigValue("application.properties", "canal.table");
private static String OPERATOR = ConfigUtils.getConfigValue("application.properties", "canal.operator");
private static String CANAL_OUTPUT = ConfigUtils.getConfigValue("application.properties", "canal.output");
private static DorisUtil dorisUtil;
private static MySQLUtil mySQLUtil;
public static void process(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
if (eventType == CanalEntry.EventType.TRUNCATE && OPERATOR.contains("TRUNCATE")) {
if (StringUtils.isEmpty(DATABASE) ||
(entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {
if (StringUtils.isEmpty(TABLE) ||
(entry.getHeader().getTableName() != null && isContain(TABLE.split(","), entry.getHeader().getTableName()))) {
logger.info("TRUNCATE TABLE " + entry.getHeader().getTableName());
if (CANAL_OUTPUT.contains("mysql")) {
mySQLUtil = MySQLUtil.getInstance();
try {
mySQLUtil.mySQLTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
} catch (SQLException e) {
e.printStackTrace();
logger.error("MySQL执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());
}
}
if (CANAL_OUTPUT.contains("doris")) {
dorisUtil = DorisUtil.getInstance();
try {
dorisUtil.dorisTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
} catch (SQLException e) {
e.printStackTrace();
logger.error("Doris执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());
}
}
}
}
}
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
// 过滤database, table, operator
if (StringUtils.isEmpty(DATABASE) ||
(entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {
if (StringUtils.isEmpty(TABLE) ||
(entry.getHeader().getTableName()!=null && isContain(TABLE.split(","),entry.getHeader().getTableName()))) {
if (CANAL_OUTPUT.contains("mysql")) {
mySQLUtil = MySQLUtil.getInstance();
try {
if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {
mySQLUtil.mySQLDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {
mySQLUtil.mySQLInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {
mySQLUtil.mySQLUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
} else {
// nothing to do
}
} catch (SQLException e) {
logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);
}
}
if (CANAL_OUTPUT.contains("doris")) {
dorisUtil = DorisUtil.getInstance();
try {
if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {
dorisUtil.dorisDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {
dorisUtil.dorisInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {
dorisUtil.dorisUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
} else {
// nothing to do
}
} catch (SQLException e) {
logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);
}
}
}
}
}
}
}
public static boolean isContain(String[] list, String value) {
if (list == null || value == null) return false;
for (String lv : list) {
if (value.trim().equals(lv.trim())) {
return true;
}
}
return false;
}
private static void printColumn(String database, String table, List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
logger.info(database + "-" + table + "-" + column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
7,启动 canal 服务端
在 canal 根目录下,执行如下命令
./bin/startup.sh
8,启动 canal 客户端
因为我用的 jar,所以,启动 jar 包就行了。
9,待完成事项
1,doris 官方文档上有通过 binLog 同步数据到 doris 中的方法,这部分待实现。
2,当前客户端写法单一。一旦canal 服务端重启,应用自动停机。待优化。