目录
- 前言
- 一、MySQL8配置
- 1. 修改my.cnf
- 2. 重启mysql
- 3. 建用户、授权
- 二、Canal服务端配置
- 1. 下载
- 2. 修改配置
- 3. 启动服务与验证
- 三、Canal客户端编写
- 1. yml配置文件添加canal服务端配置信息和Redis信息
- 2. 配置pom文件
- 3. 代码
- 4. MySQL建表storage.storage
- 5. 启动客户端与验证
- 参考
前言
自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password。按照官网的文档执行,在启动时会报错。
本文写了下大致的步骤。
一、MySQL8配置
1. 修改my.cnf
可以 find / -name my.cnf 命令找到这个配置文件
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2. 重启mysql
systemctl stop mysqld.service
systemctl start mysqld.service
3. 建用户、授权
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
-- 降低密码验证等级
set global validate_password.policy=LOW;
set global validate_password.length=5;
-- 自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password
drop user canal;
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
二、Canal服务端配置
1. 下载
canal下载地址
上传服务器
# 建立文件夹
mkdir canal
# 解压
tar -xvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C ./canal/
2. 修改配置
# 进入conf/example目录
cd conf/example
# 修改配置文件
vi instance.properties
- slaveId:从节点id,不与1.1节的server_id一样即可
- address:连接的mysql的ip:port
- dbUsername/dbPassword:用户名/密码
主要配置这几个即可
3. 启动服务与验证
# 进入bin目录
cd bin
# 启动脚本
sh startup.sh
# 查看启动日志
tail -200f ../logs/example/example.log
无报错信息即可。
三、Canal客户端编写
Spring Boot集成canal.client获取变更信息,插入Redis
1. yml配置文件添加canal服务端配置信息和Redis信息
canal:
host: Canal服务端所在服务器IP
port: 11111
username: canal
password: canal
destination: example
spring:
redis:
port: 6379
host:
password: 123456
database: 1
lettuce:
pool:
min-idle: 0
max-idle: 8
max-active: 20
2. 配置pom文件
下面是所需的依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
3. 代码
- CanalClientProperties 为Canal连接属性
import lombok.Data;
@Data
public class CanalClientProperties {
private String host;
private int port;
private String username;
private String password;
private String destination;
}
- CanalConfig 为配置类
import java.net.InetSocketAddress;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
@Configuration
public class CanalConfig {
@Bean
@ConfigurationProperties(prefix = "canal")
CanalClientProperties canalClientProperties(){
return new CanalClientProperties();
}
@Bean(destroyMethod = "disconnect")
CanalConnector canalConnector(CanalClientProperties canalClientProperties) {
// 创建链接
return CanalConnectors.newSingleConnector(
new InetSocketAddress(canalClientProperties.getHost(), canalClientProperties.getPort()),
canalClientProperties.getDestination(), canalClientProperties.getUsername(), canalClientProperties.getPassword());
}
}
- CanalClient为同步监听
TABLE_NAME 限制了读取的表
import java.util.List;
import javax.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class CanalClient implements ApplicationRunner {
@Resource
private RedisTemplate<String, String> redisTemplate;
private static final String TABLE_NAME = "storage";
private static final String PRIMARY_KEY = "c_id";
private static final String SEPARATOR = ":";
@Resource
private CanalConnector canalConnector;
@Override
public void run(ApplicationArguments args) {
this.initCanal();
}
public void initCanal() {
int batchSize = 1000;
log.info("启动 canal 数据同步...");
canalConnector.connect();
canalConnector.subscribe("storage.storage");
canalConnector.rollback();
while (true) {
// 获取指定数量的数据
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
// 时间间隔1000毫秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
syncEntry(message.getEntries());
}
canalConnector.ack(batchId); // 提交确认
}
}
private void syncEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
log.info("================> binlog[{}:{}] , name[{},{}] , eventType : {}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
String tableName = entry.getHeader().getTableName();
if (!TABLE_NAME.equalsIgnoreCase(tableName)) continue;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
redisInsert(tableName, rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.UPDATE) {
printColumn(rowData.getAfterColumnsList());
redisUpdate(tableName, rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
redisDelete(tableName, rowData.getBeforeColumnsList());
}
}
}
}
private void redisInsert(String tableName, List<CanalEntry.Column> columns) {
JSONObject json = new JSONObject();
for (CanalEntry.Column column : columns) {
json.put(column.getName(), column.getValue());
}
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.opsForValue().set(key, json.toJSONString());
log.info("redis数据同步新增,key:" + key);
break;
}
}
}
private void redisUpdate(String tableName, List<CanalEntry.Column> columns) {
JSONObject json = new JSONObject();
for (CanalEntry.Column column : columns) {
json.put(column.getName(), column.getValue());
}
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.opsForValue().set(key, json.toJSONString());
log.info("redis数据同步更新,key:" + key);
break;
}
}
}
private void redisDelete(String tableName, List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.delete(key);
log.info("redis数据同步删除,key:" + key);
break;
}
}
}
private void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
4. MySQL建表storage.storage
建模式与建表
create database storage;
CREATE TABLE storage.storage (
`c_id` bigint NOT NULL AUTO_INCREMENT,
`c_commodity_name` varchar(32) NOT NULL,
`n_count` bigint NOT NULL,
PRIMARY KEY (`c_id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
5. 启动客户端与验证
启动spring boot项目,修改表数据。查看日志
参考
- Canal 1.1.5 启动报错:caching_sha2_password Auth failed
- GITHUB官网