MySQL(十二):阿里巴巴 MySQL binlog 增量订阅消费Canal组件

news2024/12/27 12:04:54

https://github.com/alibaba/canal

  • 使用 Binlog 实时更新Redis缓存
    • Mysql 服务器准备
    • Canal 服务器准备
    • Canal Client
    • 测试
  • 基于 Binlog实现跨系统实时数据同步
    • 更换数据库
    • 实现比对和补偿程序
  • 安全地实现数据备份和恢复

使用 Binlog 实时更新Redis缓存

Mysql 服务器准备

  1. 查看当前数据是否开启主从同步
show variables like '%log_bin%'; #log_bin=ON
show variables like '%binlog_format%'; #binlog_format=ROW
show variables like '%server_id%'; #server_id=1
  1. 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
    /etc/mysql/my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  1. 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
  1. 查看canal用户
select * from mysql.user;
  1. 查看当前数据库位置
show master status;

在这里插入图片描述

Canal 服务器准备

  1. docker 部署
version: '3.8'
services:
  canal:
    image: canal/canal-server:v1.1.6
    container_name: canal
    restart: always
    ports:
      - 11111:11111
    depends_on:
      - mysql
    volumes:
      - /data/canal/conf:/home/admin/canal-server/conf

docker cp canal:/home/admin/canal-server/conf /data/canal/conf

  1. canal 配置
  • 端口
  • 监控哪一些业务数据 (canal.destinations = promotion,seckill)
# tcp bind ip
canal.ip = 127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112

....
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
....
....
#################################################
#########               destinations            #############
#################################################
canal.destinations = promotion,seckill
  1. canal 配置业务配置目录
mkdir /data/canal/conf/promotion
➜  mkdir /data/canal/conf/seckill
  1. 进入promotion目录,修改instance.properties,如果目录中没有这个文件,可以从example目录拷贝一个过来
➜  conf cp example/* promotion
  1. vim instance.properties
...
# position info
canal.instance.master.address=mysql.localhost.com:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=4791814
canal.instance.master.timestamp=
canal.instance.master.gtid=

....
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...


# table regex
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=tl_mall_promotion.sms_home_advertise,tl_mall_promotion.sms_home_brand,tl_mall_promotion.sms_home_new_product,tl_mall_promotion.sms_home_recommend_product,tl_mall_promotion.sms_home_recommend_subject

...
  1. 启动
  • docker-compose -f docker-compose-base.yml up -d
  • docker start canal
  1. log

docker logs canal

在这里插入图片描述

Canal Client

在这里插入图片描述

  1. 依赖
 <!-- 引入canal -->
 <dependency>
     <groupId>com.alibaba.otter</groupId>
     <artifactId>canal.client</artifactId>
     <version>1.1.4</version>
     <exclusions>
         <exclusion>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
         </exclusion>
     </exclusions>
 </dependency>
  1. 配置
canal:
  server:
    ip: canal.localhost.com
    port: 11111
#  product:
#    destination: product
#    indexName: product_db
#    batchSize: 1000
  promotion:
    destination: promotion
    batchSize: 1000
  seckill:
    destination: seckill
    batchSize: 1000
  1. canal 客户端
@Configuration
@EnableScheduling
@EnableAsync
public class CanalPromotionConfig {

    @Value("${canal.server.ip}")
    private String canalServerIp;

    @Value("${canal.server.port}")
    private int canalServerPort;

    @Value("${canal.server.username:blank}")
    private String userName;

    @Value("${canal.server.password:blank}")
    private String password;

    @Value("${canal.promotion.destination}")
    private String destination;

    @Bean("promotionConnector")
    public CanalConnector newSingleConnector(){
        String userNameStr = "blank".equals(userName) ? "" : userName;
        String passwordStr = "blank".equals(password) ? "" : password;
        return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
                canalServerPort), destination, userNameStr, passwordStr);
    }

}
  1. 同步数据 — Redis

@Slf4j
@Service
public class PromotionData implements IProcessCanalData {

    private final static String SMS_HOME_ADVERTISE = "sms_home_advertise";
    private final static String SMS_HOME_BRAND = "sms_home_brand";
    private final static String SMS_HOME_NEW_PRODUCT = "sms_home_new_product";
    private final static String SMS_HOME_RECOMMEND_PRODUCT = "sms_home_recommend_product";
    /*存储从表名到Redis缓存的键*/
    private Map<String,String> tableMapKey = new HashMap<>();

    @Autowired
    @Qualifier("promotionConnector")
    private CanalConnector connector;

    @Autowired
    private PromotionRedisKey promotionRedisKey;

    @Autowired
    private RedisClusterUtil redisOpsExtUtil;

    @Value("${canal.promotion.subscribe:server}")
    private String subscribe;

    @Value("${canal.promotion.batchSize}")
    private int batchSize;

    @PostConstruct
    @Override
    public void connect() {
        tableMapKey.put(SMS_HOME_ADVERTISE,promotionRedisKey.getHomeAdvertiseKey());
        tableMapKey.put(SMS_HOME_BRAND,promotionRedisKey.getBrandKey());
        tableMapKey.put(SMS_HOME_NEW_PRODUCT,promotionRedisKey.getNewProductKey());
        tableMapKey.put(SMS_HOME_RECOMMEND_PRODUCT,promotionRedisKey.getRecProductKey());
        connector.connect();
        if("server".equals(subscribe))
            connector.subscribe(null);
        else
            connector.subscribe(subscribe);
        connector.rollback();
    }

    @PreDestroy
    @Override
    public void disConnect() {
        connector.disconnect();
    }

    @Async
    @Scheduled(initialDelayString="${canal.promotion.initialDelay:5000}",fixedDelayString = "${canal.promotion.fixedDelay:5000}")
    @Override
    public void processData() {
        try {
            if(!connector.checkValid()){
                log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");
                this.connect();
            }else{
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    log.info("本次[{}]没有检测到促销数据更新。",batchId);
                }else{
                    log.info("本次[{}]促销数据本次共有[{}]次更新需要处理",batchId,size);
                    /*一个表在一次周期内可能会被修改多次,而对Redis缓存的处理只需要处理一次即可*/
                    Set<String> factKeys = new HashSet<>();
                    for(CanalEntry.Entry entry : message.getEntries()){
                        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                            continue;
                        }
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        String tableName = entry.getHeader().getTableName();
                        if(log.isDebugEnabled()){
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            log.debug("数据变更详情:来自binglog[{}.{}],数据源{}.{},变更类型{}",
                                    entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),
                                    entry.getHeader().getSchemaName(),tableName,eventType);
                        }
                        factKeys.add(tableMapKey.get(tableName));
                    }
                    for(String key : factKeys){
                        if(StringUtils.isNotEmpty(key))  redisOpsExtUtil.delete(key);
                    }
                    connector.ack(batchId); // 提交确认
                    log.info("本次[{}]处理促销Canal同步数据完成",batchId);
                }
            }
        } catch (Exception e) {
            log.error("处理促销Canal同步数据失效,请检查:",e);
        }

    }
}
  1. 同步数据 — ES
@Service
@Slf4j
public class ProductESData implements IProcessCanalData {

    private final static String T_ID = "id";
    private final static String T_NAME = "name";
    private final static String T_KEYWORDS = "keywords";
    private final static String T_SUB_TITLE = "sub_title";
    private final static String T_PRICE = "price";
    private final static String T_PROMOTION_PRICE = "promotion_price";
    private final static String T_ORIGINAL_PRICE = "original_price";
    private final static String T_PIC = "pic";
    private final static String T_SALE = "sale";
    private final static String T_BRAND_ID = "brand_id";
    private final static String T_BRAND_NAME = "brand_name";
    private final static String T_PRODUCT_CATEGORY_ID = "product_category_id";
    private final static String T_PRODUCT_CATEGORY_NAME = "product_category_name";

    @Value("${canal.product.indexName}")
    private String indexName;

    @Autowired
    @Qualifier("productConnector")
    private CanalConnector connector;

    @Value("${canal.product.subscribe:server}")
    private String subscribe;

    @Value("${canal.product.batchSize}")
    private int batchSize;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @PostConstruct
    @Override
    public void connect() {
        connector.connect();
        if("server".equals(subscribe))
            connector.subscribe(null);
        else
            connector.subscribe(subscribe);
        connector.rollback();
    }

    @PreDestroy
    @Override
    public void disConnect() {
        connector.disconnect();
    }

    @Async
    @Scheduled(initialDelayString = "${canal.product.initialDelay:5000}", fixedDelayString = "${canal.product.fixedDelay:1000}")
    @Override
    public void processData() {
        try {
            if (!connector.checkValid()) {
                log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");
                this.connect();
            } else {
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    log.info("本次没有检测到商品数据更新。");
                } else {
                    log.info("商品数据本次共有[{}]次更新需要处理", size);
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                            continue;
                        }
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        String tableName = entry.getHeader().getTableName();
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        log.debug("数据变更详情:来自binglog[{}.{}],数据源{}.{},变更类型{}",
                                entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                entry.getHeader().getSchemaName(), tableName, eventType);
                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                            if (eventType == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : columns) {
                                    if(column.getName().equals("id")) {
                                        deleteDoc(column.getValue());
                                        break;
                                    }
                                }
                            } else if (eventType == CanalEntry.EventType.INSERT) {
                                List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                                ProductESVo productESVo = new ProductESVo();
                                String docId = makeVo(columns,productESVo);
                                insertDoc(docId,productESVo);
                            } else {
                                List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                                ProductESVo productESVo = new ProductESVo();
                                String docId = makeVo(columns,productESVo);
                                if(null != docId){
                                    if(null == productESVo){
                                        log.info("商品的删除状态字段update为已删除,从ES中移除");
                                        deleteDoc(docId);
                                    }else  updateDoc(docId,productESVo);
                                }
                            }
                        }
                    }
                    connector.ack(batchId); // 提交确认
                }
            }
        } catch (Exception e) {
            log.error("处理商品Canal同步数据失效,请检查:", e);
        }
    }

    private String makeVo(List<CanalEntry.Column> columns,ProductESVo productESVo){
        String docId = null;
        for (CanalEntry.Column column : columns) {
            String colName = column.getName();
            String colValue = column.getValue();
            if(colName.equals(T_ID)) {
                docId = colValue;
            }else if(colName.equals(T_NAME)) {
                productESVo.setName(colValue);
            } if(colName.equals(T_KEYWORDS)) {
                productESVo.setKeywords(colValue);
            } if(colName.equals(T_SUB_TITLE)) {
                productESVo.setSubTitle(colValue);
            } if(colName.equals(T_PRICE)) {
                productESVo.setPrice(new BigDecimal(colValue));
            } if(colName.equals(T_PROMOTION_PRICE)) {
                productESVo.setPromotionPrice(new BigDecimal(colValue));
            } if(colName.equals(T_ORIGINAL_PRICE)) {
                productESVo.setOriginalPrice(new BigDecimal(colValue));
            } if(colName.equals(T_PIC)) {
                productESVo.setPic(colValue);
            } if(colName.equals(T_SALE)) {
                productESVo.setSaleCount(Integer.valueOf(colValue));
            } if(colName.equals(T_BRAND_ID)) {
                productESVo.setBrandId(Long.valueOf(colValue));
            } if(colName.equals(T_BRAND_NAME)) {
                productESVo.setBrandName(colValue);
            } if(colName.equals(T_PRODUCT_CATEGORY_ID)) {
                productESVo.setCategoryId(Long.valueOf(colValue));
            } if(colName.equals(T_PRODUCT_CATEGORY_NAME)) {
                productESVo.setCategoryName(colValue);
            } if(colName.equals("delete_status")) {
                if(1 == Integer.valueOf(colValue)){
                    productESVo = null;
                }
            }
        }
        return docId;
    }

    private void deleteDoc(String docId) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(indexName, docId);
        DeleteResponse deleteResponse =
                restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
        String indexDoc = indexName+"/"+docId;
        if(deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND){
            log.warn("删除不存在的文档 {}", indexDoc);
        }else{
            log.info("删除文档 {} 成功",indexDoc);
        }
    }

    private void updateDoc(String docId,ProductESVo productESVo) throws IOException {
        String productJson = JSONObject.toJSONString(productESVo);
        /*XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
        xContentBuilder.startObject();
        for (Map.Entry<String, String> entry : updateField.entrySet()) {
            xContentBuilder.field(entry.getKey(), entry.getValue());
        }
        xContentBuilder.endObject();*/
        UpdateRequest request =
                new UpdateRequest(indexName, docId).doc(productJson,XContentType.JSON);
        request.docAsUpsert(true);
        UpdateResponse updateResponse =
                restHighLevelClient.update(request, RequestOptions.DEFAULT);
        String indexDoc = indexName+"/"+docId;
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            log.info("文档 {} 不存在,更新变更为创建成功",indexDoc);
        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            log.info("文档 {} 不存在,更新成功",indexDoc);
        } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
            log.warn("更新操作里文档 {} 被删除,请检查",indexDoc);
        } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
            log.warn("文档 {} 未做任何更新操作,请检查",indexDoc);
        }
    }

    private void insertDoc(String docId, ProductESVo productESVo) throws IOException {
        IndexRequest indexRequest = new IndexRequest(indexName);
        indexRequest.id(docId);
        String productJson = JSONObject.toJSONString(productESVo);
        indexRequest.source(productJson, XContentType.JSON);
        IndexResponse indexResponse =
                restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        if(indexResponse!=null){
            String id = indexResponse.getId();
            if(indexResponse.getResult() == DocWriteResponse.Result.CREATED){
                log.info("新增文档成功,id = {}",id);
            }else if(indexResponse.getResult() == DocWriteResponse.Result.UPDATED){
                log.warn("新增转为覆盖文档成功,id = {}",id);
            }
        }
    }

}
  1. Ack
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
connector.ack(batchId); // 提交确认

测试

修改DB对应表,查看日志

在这里插入图片描述

基于 Binlog实现跨系统实时数据同步

在这里插入图片描述

更换数据库

在设计迁移方案的时候,一定要保证每一步都是可逆的。也就是必须保证,每执行完一个步骤,一旦出现任何问题,都能快速回滚到上一个步骤:

  1. 首先要做的一点是,把旧库的数据全部复制到新库中。因为旧库还在服务线上业务,所以不断会有订单数据写入旧库,我们不仅要向新库复制数据,还要保证新旧两个库的数据是实时同步的。所以,需要用一个同步程序来实现新旧两个数据库的实时同步。

    可以使用Binlog实现两个异构数据库之间数据的实时同步。这一步不需要回滚,因为这里只增加了一个新库和一个同步程序,对系统的旧库和程序没有任何改变。即使新上线的同步程序影响到了旧库,停掉同步程序也就可以了。

在这里插入图片描述

  1. 然后需要改造一下订单服务,业务逻辑部分不需要变动,数据访问的DAO层需要进行如下改造:
  • 1)支持双写新旧两个库,并且预留热切换开关,能通过开关控制三种写状态:只写旧库、只写新库和同步双写。
  • 2)支持读取新旧两个库,同样预留热切换开关,控制读取旧库还是新库。
  • 3)然后上线新版的订单服务,这个时候订单服务仍然是只读写旧库,不读写新库。让这个新版的订单服务稳定运行至少一到两周的时间,其间我们不仅要验证新版订单服务的稳定性,还要验证新旧两个订单库中的数据是否保持一致。这个过程中,如果新版订单服务出现任何问题,都要立即下线新版订单服务,回滚到旧版本的订单服务。

在这里插入图片描述

  1. 稳定一段时间之后,就可以开启订单服务的双写开关了。开启双写开关的同时,需要停掉同步程序。这里有一个需要特别注意的问题是,这里双写的业务逻辑,一定是先写旧库,再写新库,并且以旧库的结果为准。

    如果旧库写成功,新库写失败,则返回成功,但这个时候要记录日志,后续我们会根据这个日志来验证新库是否还有问题。如果旧库写失败,则直接返回失败,同时也不再写新库了。这么做的原因是不能让新库影响到现有业务的可用性和数据准确性。上面这个过程如果出现任何问题都要关闭双写,回滚到只读写旧库的状态。

    切换到双写之后,新库与旧库的数据可能会出现不一致的问题。
    原因有两点:

    • 一是停止同步程序和开启双写,这两个过程很难做到无缝衔接;
    • 二是双写的第略也不能保证新旧库的强一致性。对于这个问题,我们需要上线一个比对和补偿的程序,用于比对旧库最近的数据变更,然后检查新库中的数据是否一致,如果不一致,则需要进行补偿。

    开启双写之后,还需要稳定运行至少几周的时间,并且在这期间我们需要不断地检查,以确保不能有旧库写成功、新库写失败的问题。如果在几周之后比对程序发现新旧两个库的数据没有不一致的情况,那就可以认为新旧两个库的数据一直都是保持同步的。

在这里插入图片描述

  1. 接下来就可以用类似灰度发布的方式把读请求逐步切换到新库上。同样,运行期间如果出现任何问题,都要再切回到旧库。

在这里插入图片描述

  1. 将全部读请求都切换到新库上之后,其实读写请求已经全部切换到新库上了,虽然实际的切换已经完成,但后续还有需要收尾的步骤。

    再稳定一段时间之后,就可以停掉比对程序,把订单服务的写状态改为只写新库。至此,旧库就可以下线了。注意,在整个迁移过程中,只有这个步骤是不可逆的。由于这一步的主要操作就是摘掉已经不再使用的旧库,因此对于正在使用的新库并不会有什么影响,实际出问题的可能性已经非常小了。

在这里插入图片描述

至此们完成了在线更换数据库的全部流程。双写版本的订单服务也完成了它的历史使命,可以在下一次升级订单服务版木的时候下线双写功能。

数据表的变更,如果只是新增表,这个很简单,一般直接回退到旧版本程序即可;但如果牵涉到表字段的变化就麻烦些,但是也可以采用类似的思路,双写新旧表并设计热切换开关。

实现比对和补偿程序

两个随时都在变化的数据厍中的数据:

  • 没有类似复制状态机这样理论上严谨、实际操作还很简单的方法来实现比对和补偿
  • 根据业务数据的实际情况,有针对性地实现比对和补偿,经过一段时间之后,把新旧两个数据库的差异逐渐收敛到一致

时效性比较强的数据(订单):

  • 比较容易进行比对和补偿
  • 比对和补偿程序就可以根据表完成时间,每次只比对这个时间窗口内完成的数据
  • 补偿的逻辑也很简单,发现不一致的情况后,直接用旧库的表数据覆盖新库的表数据
  • 后续在双写的时候只要新库不是频繁写入失败,就可以保证两个库的数据完全一致

时效性比较弱的数据(商品):

  • 数据上带有更新时间,那么比对程序就可以利用这个更新时间
    1. 每次从旧库中读取一个更新时间窗口内的数据,到新库中查找具有相同主键的数据进行比对,如果发现数据不一致,则还要比对一下更新时间
    2. 如果新库数据的更新时间晚于旧库数据,那么很可能是比对期间数据发生了变化,这种情况暂时不要补偿,放到下个时间窗口继续进行比对即可。
    3. 时间窗口的结束时间不要选取当前时间,而是要比当前时间早一点,比如1分钟之前,这样就可以避免比对正在写入的数据了
  • 数据没带时间戳信息,那就只能从旧库中读取Binlog,获取数据变化信息后到新库中查找对应的数据进行比对和补偿

安全地实现数据备份和恢复

  • 数据库宕机
  • 磁盘损坏
  • 机房着火
  • 删库跑路

存储系统导致的比较严重的损失主要有:

  1. 数据丢失造成的直接财产损失。比如订单数据丢失造成了大量的坏账。为了避免这种损失,系统需要保证数据的高可靠性
  2. 存储系统的损坏,造成整个业务系统停止服务而带来的损失。比如,电商系统停服期间造成的收人损失。为了避免这种损失,系统需要保证存储服务的高可用性

保证数据安全: (定期备份数据)

我们最常用的MySQL如何更安全地实现数据的备份和恢复呢?

  • 全量备份,备份的时候把所有的数据复制一份,存放到文件中,恢复的时候再把文件中的数据复制回去
    $ mysqldump -uroot -p test > test.sql

    缺点:(不能频繁地对数据库执行全量备份):

    1. 包含了数据库中的所有数据,占用的磁盘空间大
    2. 每次备份操作都要拷贝大量的数据,备份过程中会占用数据库服务器大量的CPU和磁盘IO资源
    3. 为了保证数据一致性,备份过程中很有可能会锁表
    4. 全量备份的代价比较高不能频繁地执行备份操作,所以全量备份不能做到完全无损的恢复
  • 增量备份,MySQL自带的 Binlog就是一种实时的增量备份工具。Binlog所记录的就是MySQL 数据变更的操作日志。开启 Binlog之后,MySQL中数据的每次更新操作,都会记录到Binlog 中。Binlog是可以回放的,回放Binlog,就相当于是把之前对数据库中所有数据的更新操作,都按顺序重新执行一遍,回放完成之后数据自然就恢复了。这就是Binlog增量备份的基本原理。

在执行备份和恢复的时候,大家需要特别注意:

  1. 也是最重要的“不要把所有的鸡蛋放在同一个篮子中”,无论是全量备份还是Binlog,都不要与数据库存放在同一个服务器上。最好能存放到不同的机房,甚至不同城市离得越远越好。这样即使出现机房着火、光缆被挖断甚至地震也不怕数据丢失。
  2. 在回放 Binlog的时候,指定的起始时间可以比全量备份的时间稍微提前一点儿,这样可以确保全量备份之后的所有操作都在恢复的Binlog 范围内,从而保证数据恢复的完整性。
    注意:为了确保回放的幂等性,需要将Binlog的格式设置为ROW格式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/100328.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

毫米波电路的PCB设计和加工(第一部分)

毫米波应用要点——相位精度受许多变量影响 从自动驾驶车辆上使用的防碰雷达系统到第五代&#xff08;5G&#xff09;高数据速率新无线&#xff08;NR&#xff09;网络技术&#xff0c;毫米波&#xff08;mmWave&#xff09;电路的应用领域正在快速增长。许多应用正在促进工作…

锐浪报表 Grid++Report 导出其它格式文件

锐浪报表 GridReport 导出其它格式文件 GridReport控件设计的报表&#xff0c;不仅可以打印&#xff0c;还可以导出8种格式的报表文件。 在GridReport的打印浏览中&#xff0c;有指定导出文件的对话框&#xff1a; 但是&#xff0c;软件的设计中&#xff0c;往往需要设计出&am…

黑*头条_第5章_延迟任务精准发布文章(新版)

黑*头条_第5章_延迟任务精准发布文章(新版) 文章目录黑*头条_第5章_延迟任务精准发布文章(新版)1)文章定时发布2)延迟任务概述2.1)什么是延迟任务2.2)技术对比2.2.1)DelayQueue2.2.2)RabbitMQ实现延迟任务2.2.3)redis实现3)redis实现延迟任务4)延迟任务服务实现4.1)搭建heima-l…

༺ཌ༈学编程到底学那种语言呢?༈ད༻

说到底&#xff0c;编程语言只是工具&#xff0c;就像螺丝刀一样。在需要使用圆头螺丝刀的时候&#xff0c;你就不能一意孤行使用一字螺丝刀。你需要根据实际的情况做决定。没有任何一种编程语言能够取代一切&#xff0c;成为终极编程语言。你需要根据当前岗位的要求&#xff0…

分布式数据库中间件Mycat介绍

从Cobar到Mycat&#xff0c;从闭源到开源&#xff0c;作为一个开源的分布式数据库中间件&#xff0c;Mycat已经被众多开源项目使用。本文简要介绍下Mycat的特性、基本架构以及分库分表和读写分离的配置。 1、Mycat基本介绍 Mycat是一个开源的分布式数据库中间件&#xff0c;前…

nodejs+vue044高校学生信息管理系统

大学生信息综合管理系统分三个身份登录&#xff0c;一个学生&#xff0c;一个管理员。学生只能修改密码&#xff0c;而管理员可以修改任何信息。老师可以查看自己的课表和校园活动. 管理员模块主要有老师管理&#xff0c;添加老师&#xff0c;班级管理&#xff0c;班级添加&…

[附源码]Python计算机毕业设计Django文具商城购物系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

Educational Codeforces Round 140

C. Count Binary Strings 大意&#xff1a; 要求满足条件的01串的个数 要求如下 给定一个上三角矩阵&#xff0c;对于矩阵的元素i,j,a[i][j]有一下三个取值&#xff1a; 1&#xff1a;i-j之间只能有一种元素 2&#xff1a;i-j之间只能有两种元素 0: i-j之间无所谓 思路…

(Week 7)动态规划(C++)

目录[NOIP2005 普及组] 采药&#xff08;C&#xff0c;动态规划&#xff09;题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1提示解题思路&#xff1a;最长上升子序列&#xff08;C&#xff0c;Dilworth&#xff0c;贪心&#xff09;题目描述输入格式输出格式样例 #1样…

一文盘点Zebec生态的收益模型

随着加密市场逐渐陷入低谷&#xff0c;曾经火热的NFT、GameFi等赛道都陷入了沉寂。投资者目前很难在加密市场中获得可观的收益&#xff0c;而在整体加密市场发展局势不明朗的情况下&#xff0c;行业目前缺乏发展动力。 目前&#xff0c;以流支付为主要定位的Zebec生态&#xff…

Java 基础语法

一个 Java 程序可以认为是一系列对象的集合&#xff0c;而这些对象通过调用彼此的方法来协同工作。下面简要介绍下类、对象、方法和实例变量的概念。 对象&#xff1a;对象是类的一个实例&#xff0c;有状态和行为。例如&#xff0c;一条狗是一个对象&#xff0c;它的状态有&a…

Tomcat 9.0 安装及配置教程(win10系统)

一、前言 Tomcat 服务器是一个开源的轻量级Web应用服务器&#xff0c;在中小型系统和并发量小的场合下被普遍使用&#xff0c;是开发和调试Servlet、JSP 程序的首选。 二、安装前准备 1.确保安装过jdk&#xff0c;安装过可跳过。 如果没有安装可以先安装一下win10下JDK 官方中…

品优购首页布局-头部

10. 品优购首页布局 效果图&#xff1a; 项目结构如下&#xff1a; 命名集合&#xff1a; 名称说明快捷导航栏shortcut头部header标志logo购物车shopcar搜索search热点词hotwrods导航nav导航左侧dorpdown 包含 .dd .dt导航右侧navitems 1). shortcut 制作 通栏的盒子 命名为sho…

网络协议与攻击模拟 | APR_TCP | 系统性学习 | 无知的我费曼笔记

文章目录网络协议与攻击模拟-APR协议网络协议与攻击模拟-实施ARP攻击与欺骗实施ARP攻击实施ARP欺骗网络协议与攻击模拟-TCP三次握手网络协议与攻击模拟-APR协议 APR协议的作用 解析IP地址为MAC地址。从而进行二层数据交互 ARP工作流程 ARP请求 ARP响应 APR的报文格式 在W…

vue——将原生事件和属性绑定到组件

再vue官方文档 将原生事件绑定到组件 中有如下代码描述&#xff1a; 1. v-bind“$attrs” 以html原生属性的形式批量绑定父组件传过来的属性到任意位置&#xff0c; 例如&#xff0c;一些ui组件或者原生input标签需要一些属性来完成某些功能&#xff0c;例如 <input typ…

Linux安装Jenkins(Java11最新版)

文章目录♈️查看java版本♉️1.下载♊️2.上传到服务器♋️3.启动♌️4.记住密码♍️5.解锁Jenkins♎️6.修改插件安装地址♏️7.安装插件♐️8.登录♐️9.修改密码注意&#xff0c;这里是需要java环境的&#xff0c;如果没有java环境请参考 Linux安装Java环境 ♈️查看java版…

「集合底层」Vector底层结构及源码剖析

「集合源码」Vector底层结构及源码剖析 文章目录「集合源码」Vector底层结构及源码剖析一、基本介绍二、类继承关系三、Vector特性四、底层源码分析1、四个构造器2. 添加一个元素的过程以及扩容机制五、Vector与ArrayList共同点区别一、基本介绍 Vector 是一个矢量队列&#x…

【开发指南】AR Foundation 扫描

开发平台&#xff1a;Unity 2020 版本以上 编程平台&#xff1a;Visual Studio 2022 面向平台&#xff1a;IOS 设备   一、本文聚焦问题点 使用哪种 API 完成相机权限的获取如何进行画面跟踪对象的捕获。 对深入了解AR的开发者尤为重要。但只是从应用目的上&#xff0c;只需要…

C++PrimerPlus 第八章 函数探幽-8.5 函数模板

目录 8.5 函数模板 8.5.1 重载的模板 8.5.2 模板的局限性 8.5.3 显式具体化 8.5.3.1 第三代具体化&#xff08;ISO/ANSI C标准&#xff09; 8.5.3.2 显式具体化示例 8.5.4 实例化和具体化 8.5.5 编译器选择使用哪个函数版本 8.5.5.1 完全匹配和最佳匹配 8.5.5.2 部分…

计算机毕设Python+Vue学生风采网(程序+LW+部署)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…