https://github.com/alibaba/canal
- 使用 Binlog 实时更新Redis缓存
- Mysql 服务器准备
- Canal 服务器准备
- Canal Client
- 测试
- 基于 Binlog实现跨系统实时数据同步
- 更换数据库
- 实现比对和补偿程序
- 安全地实现数据备份和恢复
使用 Binlog 实时更新Redis缓存
Mysql 服务器准备
- 查看当前数据是否开启主从同步
show variables like '%log_bin%'; #log_bin=ON
show variables like '%binlog_format%'; #binlog_format=ROW
show variables like '%server_id%'; #server_id=1
- 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
/etc/mysql/my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
- 查看canal用户
select * from mysql.user;
- 查看当前数据库位置
show master status;
Canal 服务器准备
- docker 部署
version: '3.8'
services:
canal:
image: canal/canal-server:v1.1.6
container_name: canal
restart: always
ports:
- 11111:11111
depends_on:
- mysql
volumes:
- /data/canal/conf:/home/admin/canal-server/conf
docker cp canal:/home/admin/canal-server/conf /data/canal/conf
- canal 配置
- 端口
- 监控哪一些业务数据 (canal.destinations = promotion,seckill)
# tcp bind ip
canal.ip = 127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
....
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
....
....
#################################################
######### destinations #############
#################################################
canal.destinations = promotion,seckill
- canal 配置业务配置目录
➜ mkdir /data/canal/conf/promotion
➜ mkdir /data/canal/conf/seckill
- 进入promotion目录,修改instance.properties,如果目录中没有这个文件,可以从example目录拷贝一个过来
➜ conf cp example/* promotion
- vim instance.properties
...
# position info
canal.instance.master.address=mysql.localhost.com:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=4791814
canal.instance.master.timestamp=
canal.instance.master.gtid=
....
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# table regex
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=tl_mall_promotion.sms_home_advertise,tl_mall_promotion.sms_home_brand,tl_mall_promotion.sms_home_new_product,tl_mall_promotion.sms_home_recommend_product,tl_mall_promotion.sms_home_recommend_subject
...
- 启动
- docker-compose -f docker-compose-base.yml up -d
- docker start canal
- log
docker logs canal
Canal Client
- 依赖
<!-- 引入canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
- 配置
canal:
server:
ip: canal.localhost.com
port: 11111
# product:
# destination: product
# indexName: product_db
# batchSize: 1000
promotion:
destination: promotion
batchSize: 1000
seckill:
destination: seckill
batchSize: 1000
- canal 客户端
@Configuration
@EnableScheduling
@EnableAsync
public class CanalPromotionConfig {
@Value("${canal.server.ip}")
private String canalServerIp;
@Value("${canal.server.port}")
private int canalServerPort;
@Value("${canal.server.username:blank}")
private String userName;
@Value("${canal.server.password:blank}")
private String password;
@Value("${canal.promotion.destination}")
private String destination;
@Bean("promotionConnector")
public CanalConnector newSingleConnector(){
String userNameStr = "blank".equals(userName) ? "" : userName;
String passwordStr = "blank".equals(password) ? "" : password;
return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
canalServerPort), destination, userNameStr, passwordStr);
}
}
- 同步数据 — Redis
@Slf4j
@Service
public class PromotionData implements IProcessCanalData {
private final static String SMS_HOME_ADVERTISE = "sms_home_advertise";
private final static String SMS_HOME_BRAND = "sms_home_brand";
private final static String SMS_HOME_NEW_PRODUCT = "sms_home_new_product";
private final static String SMS_HOME_RECOMMEND_PRODUCT = "sms_home_recommend_product";
/*存储从表名到Redis缓存的键*/
private Map<String,String> tableMapKey = new HashMap<>();
@Autowired
@Qualifier("promotionConnector")
private CanalConnector connector;
@Autowired
private PromotionRedisKey promotionRedisKey;
@Autowired
private RedisClusterUtil redisOpsExtUtil;
@Value("${canal.promotion.subscribe:server}")
private String subscribe;
@Value("${canal.promotion.batchSize}")
private int batchSize;
@PostConstruct
@Override
public void connect() {
tableMapKey.put(SMS_HOME_ADVERTISE,promotionRedisKey.getHomeAdvertiseKey());
tableMapKey.put(SMS_HOME_BRAND,promotionRedisKey.getBrandKey());
tableMapKey.put(SMS_HOME_NEW_PRODUCT,promotionRedisKey.getNewProductKey());
tableMapKey.put(SMS_HOME_RECOMMEND_PRODUCT,promotionRedisKey.getRecProductKey());
connector.connect();
if("server".equals(subscribe))
connector.subscribe(null);
else
connector.subscribe(subscribe);
connector.rollback();
}
@PreDestroy
@Override
public void disConnect() {
connector.disconnect();
}
@Async
@Scheduled(initialDelayString="${canal.promotion.initialDelay:5000}",fixedDelayString = "${canal.promotion.fixedDelay:5000}")
@Override
public void processData() {
try {
if(!connector.checkValid()){
log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");
this.connect();
}else{
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
log.info("本次[{}]没有检测到促销数据更新。",batchId);
}else{
log.info("本次[{}]促销数据本次共有[{}]次更新需要处理",batchId,size);
/*一个表在一次周期内可能会被修改多次,而对Redis缓存的处理只需要处理一次即可*/
Set<String> factKeys = new HashSet<>();
for(CanalEntry.Entry entry : message.getEntries()){
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
if(log.isDebugEnabled()){
CanalEntry.EventType eventType = rowChange.getEventType();
log.debug("数据变更详情:来自binglog[{}.{}],数据源{}.{},变更类型{}",
entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),tableName,eventType);
}
factKeys.add(tableMapKey.get(tableName));
}
for(String key : factKeys){
if(StringUtils.isNotEmpty(key)) redisOpsExtUtil.delete(key);
}
connector.ack(batchId); // 提交确认
log.info("本次[{}]处理促销Canal同步数据完成",batchId);
}
}
} catch (Exception e) {
log.error("处理促销Canal同步数据失效,请检查:",e);
}
}
}
- 同步数据 — ES
@Service
@Slf4j
public class ProductESData implements IProcessCanalData {
private final static String T_ID = "id";
private final static String T_NAME = "name";
private final static String T_KEYWORDS = "keywords";
private final static String T_SUB_TITLE = "sub_title";
private final static String T_PRICE = "price";
private final static String T_PROMOTION_PRICE = "promotion_price";
private final static String T_ORIGINAL_PRICE = "original_price";
private final static String T_PIC = "pic";
private final static String T_SALE = "sale";
private final static String T_BRAND_ID = "brand_id";
private final static String T_BRAND_NAME = "brand_name";
private final static String T_PRODUCT_CATEGORY_ID = "product_category_id";
private final static String T_PRODUCT_CATEGORY_NAME = "product_category_name";
@Value("${canal.product.indexName}")
private String indexName;
@Autowired
@Qualifier("productConnector")
private CanalConnector connector;
@Value("${canal.product.subscribe:server}")
private String subscribe;
@Value("${canal.product.batchSize}")
private int batchSize;
@Autowired
private RestHighLevelClient restHighLevelClient;
@PostConstruct
@Override
public void connect() {
connector.connect();
if("server".equals(subscribe))
connector.subscribe(null);
else
connector.subscribe(subscribe);
connector.rollback();
}
@PreDestroy
@Override
public void disConnect() {
connector.disconnect();
}
@Async
@Scheduled(initialDelayString = "${canal.product.initialDelay:5000}", fixedDelayString = "${canal.product.fixedDelay:1000}")
@Override
public void processData() {
try {
if (!connector.checkValid()) {
log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");
this.connect();
} else {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
log.info("本次没有检测到商品数据更新。");
} else {
log.info("商品数据本次共有[{}]次更新需要处理", size);
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
log.debug("数据变更详情:来自binglog[{}.{}],数据源{}.{},变更类型{}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), tableName, eventType);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : columns) {
if(column.getName().equals("id")) {
deleteDoc(column.getValue());
break;
}
}
} else if (eventType == CanalEntry.EventType.INSERT) {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
ProductESVo productESVo = new ProductESVo();
String docId = makeVo(columns,productESVo);
insertDoc(docId,productESVo);
} else {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
ProductESVo productESVo = new ProductESVo();
String docId = makeVo(columns,productESVo);
if(null != docId){
if(null == productESVo){
log.info("商品的删除状态字段update为已删除,从ES中移除");
deleteDoc(docId);
}else updateDoc(docId,productESVo);
}
}
}
}
connector.ack(batchId); // 提交确认
}
}
} catch (Exception e) {
log.error("处理商品Canal同步数据失效,请检查:", e);
}
}
private String makeVo(List<CanalEntry.Column> columns,ProductESVo productESVo){
String docId = null;
for (CanalEntry.Column column : columns) {
String colName = column.getName();
String colValue = column.getValue();
if(colName.equals(T_ID)) {
docId = colValue;
}else if(colName.equals(T_NAME)) {
productESVo.setName(colValue);
} if(colName.equals(T_KEYWORDS)) {
productESVo.setKeywords(colValue);
} if(colName.equals(T_SUB_TITLE)) {
productESVo.setSubTitle(colValue);
} if(colName.equals(T_PRICE)) {
productESVo.setPrice(new BigDecimal(colValue));
} if(colName.equals(T_PROMOTION_PRICE)) {
productESVo.setPromotionPrice(new BigDecimal(colValue));
} if(colName.equals(T_ORIGINAL_PRICE)) {
productESVo.setOriginalPrice(new BigDecimal(colValue));
} if(colName.equals(T_PIC)) {
productESVo.setPic(colValue);
} if(colName.equals(T_SALE)) {
productESVo.setSaleCount(Integer.valueOf(colValue));
} if(colName.equals(T_BRAND_ID)) {
productESVo.setBrandId(Long.valueOf(colValue));
} if(colName.equals(T_BRAND_NAME)) {
productESVo.setBrandName(colValue);
} if(colName.equals(T_PRODUCT_CATEGORY_ID)) {
productESVo.setCategoryId(Long.valueOf(colValue));
} if(colName.equals(T_PRODUCT_CATEGORY_NAME)) {
productESVo.setCategoryName(colValue);
} if(colName.equals("delete_status")) {
if(1 == Integer.valueOf(colValue)){
productESVo = null;
}
}
}
return docId;
}
private void deleteDoc(String docId) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(indexName, docId);
DeleteResponse deleteResponse =
restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
String indexDoc = indexName+"/"+docId;
if(deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND){
log.warn("删除不存在的文档 {}", indexDoc);
}else{
log.info("删除文档 {} 成功",indexDoc);
}
}
private void updateDoc(String docId,ProductESVo productESVo) throws IOException {
String productJson = JSONObject.toJSONString(productESVo);
/*XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
xContentBuilder.startObject();
for (Map.Entry<String, String> entry : updateField.entrySet()) {
xContentBuilder.field(entry.getKey(), entry.getValue());
}
xContentBuilder.endObject();*/
UpdateRequest request =
new UpdateRequest(indexName, docId).doc(productJson,XContentType.JSON);
request.docAsUpsert(true);
UpdateResponse updateResponse =
restHighLevelClient.update(request, RequestOptions.DEFAULT);
String indexDoc = indexName+"/"+docId;
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
log.info("文档 {} 不存在,更新变更为创建成功",indexDoc);
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
log.info("文档 {} 不存在,更新成功",indexDoc);
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
log.warn("更新操作里文档 {} 被删除,请检查",indexDoc);
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
log.warn("文档 {} 未做任何更新操作,请检查",indexDoc);
}
}
private void insertDoc(String docId, ProductESVo productESVo) throws IOException {
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.id(docId);
String productJson = JSONObject.toJSONString(productESVo);
indexRequest.source(productJson, XContentType.JSON);
IndexResponse indexResponse =
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
if(indexResponse!=null){
String id = indexResponse.getId();
if(indexResponse.getResult() == DocWriteResponse.Result.CREATED){
log.info("新增文档成功,id = {}",id);
}else if(indexResponse.getResult() == DocWriteResponse.Result.UPDATED){
log.warn("新增转为覆盖文档成功,id = {}",id);
}
}
}
}
- Ack
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
connector.ack(batchId); // 提交确认
测试
修改DB对应表,查看日志
基于 Binlog实现跨系统实时数据同步
更换数据库
在设计迁移方案的时候,一定要保证每一步都是可逆的。也就是必须保证,每执行完一个步骤,一旦出现任何问题,都能快速回滚到上一个步骤:
-
首先要做的一点是,把旧库的数据全部复制到新库中。因为旧库还在服务线上业务,所以不断会有订单数据写入旧库,我们不仅要向新库复制数据,还要保证新旧两个库的数据是实时同步的。所以,需要用一个同步程序来实现新旧两个数据库的实时同步。
可以使用Binlog实现两个异构数据库之间数据的实时同步。这一步不需要回滚,因为这里只增加了一个新库和一个同步程序,对系统的旧库和程序没有任何改变。即使新上线的同步程序影响到了旧库,停掉同步程序也就可以了。
- 然后需要改造一下订单服务,业务逻辑部分不需要变动,数据访问的DAO层需要进行如下改造:
- 1)支持
双写
新旧两个库,并且预留热切换开关,能通过开关控制三种写状态:只写旧库、只写新库和同步双写。 - 2)支持读取新旧两个库,同样预留热切换开关,控制读取旧库还是新库。
- 3)然后上线新版的订单服务,这个时候订单服务仍然是
只读写旧库,不读写新库
。让这个新版的订单服务稳定运行至少一到两周的时间,其间我们不仅要验证新版订单服务的稳定性,还要验证新旧两个订单库中的数据是否保持一致。这个过程中,如果新版订单服务出现任何问题,都要立即下线新版订单服务,回滚到旧版本的订单服务。
-
稳定一段时间之后,就可以开启订单服务的双写开关了。开启双写开关的同时,需要停掉同步程序。这里有一个需要特别注意的问题是,这里双写的业务逻辑,一定是先写旧库,再写新库,并且以旧库的结果为准。
如果旧库写成功,新库写失败,则返回成功,但这个时候要记录日志,后续我们会根据这个日志来验证新库是否还有问题。如果旧库写失败,则直接返回失败,同时也不再写新库了。这么做的原因是不能让新库影响到现有业务的可用性和数据准确性。上面这个过程如果出现任何问题都要关闭双写,回滚到只读写旧库的状态。
切换到双写之后,新库与旧库的数据可能会出现不一致的问题。
原因有两点:- 一是停止同步程序和开启双写,这两个过程很难做到无缝衔接;
- 二是双写的第略也不能保证新旧库的强一致性。对于这个问题,我们需要上线一个比对和补偿的程序,用于比对旧库最近的数据变更,然后检查新库中的数据是否一致,如果不一致,则需要进行补偿。
开启双写之后,还需要稳定运行至少几周的时间,并且在这期间我们需要不断地检查,以确保不能有旧库写成功、新库写失败的问题。如果在几周之后比对程序发现新旧两个库的数据没有不一致的情况,那就可以认为新旧两个库的数据一直都是保持同步的。
- 接下来就可以用类似灰度发布的方式把读请求逐步切换到新库上。同样,运行期间如果出现任何问题,都要再切回到旧库。
-
将全部读请求都切换到新库上之后,其实读写请求已经全部切换到新库上了,虽然实际的切换已经完成,但后续还有需要收尾的步骤。
再稳定一段时间之后,就可以停掉比对程序,把订单服务的写状态改为只写新库。至此,旧库就可以下线了。注意,在整个迁移过程中,只有这个步骤是不可逆的。由于这一步的主要操作就是摘掉已经不再使用的旧库,因此对于正在使用的新库并不会有什么影响,实际出问题的可能性已经非常小了。
至此们完成了在线更换数据库的全部流程。双写版本的订单服务也完成了它的历史使命,可以在下一次升级订单服务版木的时候下线双写功能。
数据表的变更,如果只是新增表,这个很简单,一般直接回退到旧版本程序即可;但如果牵涉到表字段的变化就麻烦些,但是也可以采用类似的思路,双写新旧表并设计热切换开关。
实现比对和补偿程序
两个随时都在变化的数据厍中的数据:
- 没有类似复制状态机这样理论上严谨、实际操作还很简单的方法来实现比对和补偿
- 根据业务数据的实际情况,有针对性地实现比对和补偿,经过一段时间之后,把新旧两个数据库的差异逐渐收敛到一致
时效性比较强的数据(订单):
- 比较容易进行比对和补偿
- 比对和补偿程序就可以根据表完成时间,每次只比对这个时间窗口内完成的数据
- 补偿的逻辑也很简单,发现不一致的情况后,直接用旧库的表数据覆盖新库的表数据
- 后续在双写的时候只要新库不是频繁写入失败,就可以保证两个库的数据完全一致
时效性比较弱的数据(商品):
- 数据上带有更新时间,那么比对程序就可以利用这个更新时间
- 每次从旧库中读取一个更新时间窗口内的数据,到新库中查找具有相同主键的数据进行比对,如果发现数据不一致,则还要比对一下更新时间
- 如果新库数据的更新时间晚于旧库数据,那么很可能是比对期间数据发生了变化,这种情况暂时不要补偿,放到下个时间窗口继续进行比对即可。
- 时间窗口的结束时间不要选取当前时间,而是要比当前时间早一点,比如1分钟之前,这样就可以避免比对正在写入的数据了
- 数据没带时间戳信息,那就只能从旧库中读取Binlog,获取数据变化信息后到新库中查找对应的数据进行比对和补偿
安全地实现数据备份和恢复
- 数据库宕机
- 磁盘损坏
- 机房着火
- 删库跑路
存储系统导致的比较严重的损失主要有:
- 数据丢失造成的直接财产损失。比如订单数据丢失造成了大量的坏账。为了避免这种损失,系统需要保证
数据的高可靠性
- 存储系统的损坏,造成整个业务系统停止服务而带来的损失。比如,电商系统停服期间造成的收人损失。为了避免这种损失,系统需要保证
存储服务的高可用性
保证数据安全: (定期备份数据)
我们最常用的MySQL如何更安全地实现数据的备份和恢复呢?
-
全量备份,备份的时候把所有的数据复制一份,存放到文件中,恢复的时候再把文件中的数据复制回去
$ mysqldump -uroot -p test > test.sql
缺点:(不能频繁地对数据库执行全量备份):
- 包含了数据库中的所有数据,占用的磁盘空间大
- 每次备份操作都要拷贝大量的数据,备份过程中会占用数据库服务器大量的CPU和磁盘IO资源
- 为了保证数据一致性,备份过程中很有可能会锁表
- 全量备份的代价比较高不能频繁地执行备份操作,所以全量备份不能做到完全无损的恢复
-
增量备份,MySQL自带的 Binlog就是一种实时的增量备份工具。Binlog所记录的就是MySQL 数据变更的操作日志。开启 Binlog之后,MySQL中数据的每次更新操作,都会记录到Binlog 中。Binlog是可以回放的,回放Binlog,就相当于是把之前对数据库中所有数据的更新操作,都按顺序重新执行一遍,回放完成之后数据自然就恢复了。这就是Binlog增量备份的基本原理。
在执行备份和恢复的时候,大家需要特别注意:
- 也是最重要的“不要把所有的鸡蛋放在同一个篮子中”,无论是全量备份还是Binlog,都不要与数据库存放在同一个服务器上。最好能存放到不同的机房,甚至不同城市离得越远越好。这样即使出现机房着火、光缆被挖断甚至地震也不怕数据丢失。
- 在回放 Binlog的时候,指定的起始时间可以比全量备份的时间稍微提前一点儿,这样可以确保全量备份之后的所有操作都在恢复的Binlog 范围内,从而保证数据恢复的完整性。
注意:为了确保回放的幂等性,需要将Binlog的格式设置为ROW格式