MySQL 如何从 Binlog 找出变更记录并回滚

news2025/1/10 17:26:29

文章目录

    • 前言
    • 1. 案例模拟
      • 1.1 确认信息
      • 1.2 下载 Binlog
      • 1.3 准备环境
      • 1.4 注册 Binlog
      • 1.5 准备结构信息
      • 1.6 Python 订阅
      • 1.7 输出结果展示
    • 2. 原理解析
      • 2.1 程序设计
      • 2.2 模块版本
    • 总结

前言

最近有研发同学问我:有一个问题,想查一个 ID 为 xxxx 的 sku 什么时候被更新了吗?更新前的数据是什么?一般这么讲,可能是由于手动执行或者是代码 Bug 导致出现数据丢失或者数据误更新,需要确认订正,一般需要通过分析 Binlog 来解决,本篇文章将通过该案例介绍此类问题的处理思路。

1. 案例模拟

1.1 确认信息

当有需求需要从 Binlog 中查询变更记录或者需要闪回数据的时候,需要和研发确认 时间范围、涉及到的 环境信息、库名、表名 最好是可以提供 SQL 语句。在上述的案例中,研发提供的是 sku 的 ID 时间范围是 2024-02-22 18:01:42 ~ 18:03:42。

1.2 下载 Binlog

阿里云、腾讯云、华为云 的数据库服务 Binlog 都是支持直接下载的,按照研发提供的时间区间下载对应的 Binlog 日志。
在这里插入图片描述
如果是本地自建的 MySQL 数据库,是没用日志开始时间和日志结束时间的,需要先确认下时间。可参考下方文档。

推荐阅读:MySQL 查询 Binlog 生成时间

1.3 准备环境

Binlog 下载完成后,那我们想要的数据记录也在里面,接下来将介绍如何获得我们想要的记录,首先需要准备一台测试数据库(作为数据库管理人员,随身带一套 MySQL 测试环境不过分吧?)测试数据库的大版本需要和生产环境的版本大版本一致。

以下是我环境信息:

  • 生产环境 MySQL 5.7.18
  • 测试环境 MySQL 5.7.33 (单实例)

1.4 注册 Binlog

该步骤,需要把从云上下载的 Binlog 注册到我们的测试环境中,首先需要先清空测试环境中的 Binlog 日志。

reset master;

查询 Binlog 索引文件的位置:

show variables like 'log_bin_index';

将我们从生产环境下载的 Binlog 拷贝到测试环境 Binlog 目录,然后再按照 mysql-bin.index 文件中的格式,将 Binlog 写进去。

/data/mysql_57/logs/mysql-bin.000001
/data/mysql_57/logs/mysql-bin.000002
/data/mysql_57/logs/mysql-bin.000003
/data/mysql_57/logs/mysql-bin.000004
/data/mysql_57/logs/mysql-bin.000005

上面,是注册完成的 Binlog 索引文件信息,生产环境下载了 5 个 Binlog 他们分别是 008213、008214、008215、008216、008217,拷贝到测试环境后,我们将原来 Binlog 名字修改为从 000001 开始,并且是顺序的。注意给拷贝来的 Binlog 设置用户属组。

chown -R mysql:mysql mysql-bin.*

设置完成后,重启测试环境的数据库,注册阶段完成。

1.5 准备结构信息

该步骤,需要把生产环境的表结构 copy 一份到测试环境。不用全部 copy 只 copy 需要查询记录的表。例如上面的 case 我们要查 product 库下的 sku 表。就在测试环境创建一个 product 库,然后将生产环境 sku 的表结构 copy 到测试环境。

create database product;
use product;

-- 不在此展示完成结构了,与生产环境保持一致就行
create table sku(.........)

create table sku_price(.........)

需要查询到记录涉及到多少张表,那么就 copy 多少张表就行。这次案例涉及到 2 张表。

1.6 Python 订阅

该步骤,要从 5 个 Binlog 文件中搜索到我们想要的记录,一个 Binlog 中可能有几十万个事务,这里我们通过编写 Python 脚本简化操作。我们要搜索的是 product 库下 sku、sku_price 表 sku_id = 810827 的变更记录,只需要按照下方代码注释修改即可。

在准备结构信息的步骤中,我们只在注册服务器中创建了需要的表,就起到了过滤表的作用,所以代码中不需要指定表名。

# -*- coding: utf-8 -*-
import sys
from datetime import datetime
from decimal import Decimal
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent
)
from pymysqlreplication.event import XidEvent, QueryEvent

# 填写注册 MySQL 连接信息
mysql_settings = {
    'host': '172.16.104.56',
    'port': 3306,
    'user': 'bing',
    'password': 'abc123'
}

stream = BinLogStreamReader(
    connection_settings=mysql_settings,
    server_id=8023,
    log_file='mysql-bin.000001',  # 从哪个 Binlog 开始扫描
    log_pos=4,
    only_schemas='product',  # 数据库名称
    only_events=[
        DeleteRowsEvent,
        UpdateRowsEvent,
        WriteRowsEvent,
    ]
)


def simple_data_type(data_info: dict):
    """
    直接打印结果会包含一些对象信息,在这里简化处理
    """
    tem_data = {}
    for key, value in data_info.items():
        if isinstance(value, Decimal):
            tem_data[key] = float(value)
        elif isinstance(value, datetime):
            tem_data[key] = value.strftime('%Y-%m-%d %H:%M:%S')
        else:
            tem_data[key] = value
    return tem_data


search_file_name = None

for binlog_event in stream:
    if search_file_name == stream.log_file:
        pass
    elif search_file_name != stream.log_file:
        search_file_name = stream.log_file
        print('正在扫描:', search_file_name)

    for row in binlog_event.rows:

        try:
            event_time = datetime.fromtimestamp(binlog_event.timestamp)
        except OSError:
            event_time = datetime(1980, 1, 1, 0, 0)

        if isinstance(binlog_event, DeleteRowsEvent):
            df = row["values"]
            # 这里条件,需要自己改
            if int(df['sku_id']) == 810887:
                print('-' * 160)
                print('操作类型: DELETE')
                print('时间: ', event_time)
                print('日志文件: ', stream.log_file)
                print('数据库名:', binlog_event.schema)
                print('表名:', binlog_event.table)
                print('Position: ', binlog_event.packet.log_pos)
                print(simple_data_type(df))
                print('-' * 160)

        elif isinstance(binlog_event, UpdateRowsEvent):
            df = row["before_values"]
            # 这里条件,需要自己改
            if int(df['sku_id']) == 810827:
                print('-' * 160)
                print('操作类型: UPDATE')
                print('时间: ', event_time)
                print('日志文件: ', stream.log_file)
                print('数据库名:', binlog_event.schema)
                print('表名:', binlog_event.table)
                print('Position: ', binlog_event.packet.log_pos)
                print('before_values: ', simple_data_type(row["before_values"]))
                print('after_values: ', simple_data_type(row["after_values"]))

        elif isinstance(binlog_event, WriteRowsEvent):
            df = row["values"]
            # 这里条件,需要自己改
            if int(df['sku_id']) == 810827:
                print('-' * 160)
                print('操作类型: INSERT')
                print('时间: ', event_time)
                print('日志文件: ', stream.log_file)
                print('数据库名:', binlog_event.schema)
                print('表名:', binlog_event.table)
                print('Position: ', binlog_event.packet.log_pos)
                print(simple_data_type(df))

1.7 输出结果展示

结果已脱敏,可以看出 boutique_price 从原来的 1058.46 被修改为 1614.0,需要注意的是 Binlog 中的 Event 只能精确到秒。

操作类型: UPDATE
时间:  2024-02-22 18:02:42
日志文件:  mysql-bin.000003
数据库名: product
表名: sku
Position:  65716973
before_values:  {'sku_id': 810887, 'product_id': 26492, 'sku_code': '000', 'name': '', 'coverpic': '', 'introduction': '', 'in_price': 132.31, 'price': 361.1, 'created_at': '2022-11-18 13:37:48', 'updated_at': '2024-02-21 04:10:41', 'enabled': '1', 'retail_price': None, 'im_price': 150.0, 'last_check': '2022-11-18 13:37:48', 'size': 'UNI', 'boutique_price': 1058.46}
after_values:  {'sku_id': 810887, 'product_id': 26492, 'sku_code': '000', 'name': '', 'coverpic': '', 'introduction': '', 'in_price': 132.31, 'price': 361.1, 'created_at': '2022-11-18 13:37:48', 'updated_at': '2024-02-22 18:02:42', 'enabled': '1', 'retail_price': None, 'im_price': 150.0, 'last_check': '2022-11-18 13:37:48', 'size': 'UNI', 'boutique_price': 1614.0}

将结果交给研发,任务就算完成了。

2. 原理解析

2.1 程序设计

这里用到了一个模块 pymysqlreplication 它可以伪装成一个 IO 复制线程,从 MySQL 服务器中拉取 Binlog Event 并支持解析。

为什么直接解析 Binlog?因为 Binlog 中没用表字段名信息,直接解析比较难做一些过滤操作。先将表结构和 Binlog 注册到一台测试 MySQL 服务器,然后通过伪装 IO 复制线程拉取 Event 过滤找到我们想要的记录。

2.2 模块版本

模块代码库:python-mysql-replication

# 本次实验使用的版本
mysql-replication==0.13

安装方法:

pip3 install mysql-replication

总结

本篇文章介绍了如何从 Binlog 中定位记录,需要有一点 Python 基础,但注册 Binlog 思路可应用多个场景,例如使用它恢复增量日志等。得到记录结果后,如果要回滚,那么可以依靠上面的字典中的信息,翻译成 SQL 语句即可,目前程序还没有实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1486604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux(CentOS为例)环境下 Git提交代码加速,使用FastGithub,运行报错解决

当你的在服务器上使用Git进行推送时,时常会出现超时错误。这里使用FastGithub 首先下载FastGithub 这个软件作者不是为什么删除了GithUb的仓库,这个链接还有。下载Linux版本的 FastGithub Linux,Windows版本 下载完毕后解压 ./fastgithu…

Linux课程四课---Linux开发环境的使用(gcc/g++编译器的相关)

作者前言 🎂 ✨✨✨✨✨✨🍧🍧🍧🍧🍧🍧🍧🎂 ​🎂 作者介绍: 🎂🎂 🎂 🎉🎉&#x1f389…

[VSCode插件] 轻量级静态博客 - MDBlog

MDBlog VSCode插件,基于Markdown的轻量级静态博客系统,同时支持导出为可以部署的静态博客。 仓库 MDBlog 1. Features 博客基础功能:分类管理、文章管理、自动生成索引快捷指令:快捷输入表格、mermaid、wavedrom、代码块发布&a…

Photoshop 2023:重塑创意,引领数字艺术新纪元

在数字艺术的浩瀚星空中,Adobe Photoshop 2023(简称PS 2023)如同一颗璀璨的新星,为Mac和Windows用户带来了前所未有的创意体验。这款强大的图像处理软件不仅继承了前作的精髓,更在细节上进行了诸多创新,让每…

前端el-date-picker传递的日期格式不是自己想要的格式

解决方法: 添加format和value-format属性进行解决。 format“YYYY-MM-DD” value-format“YYYY-MM-DD” 注意:日期格式要用大写!!!!用小写会出现错误,不能回填选择的日期,会直接传入…

【 buuctf-swp】

wget 是什么东西呢? 那就直接过滤 http 流,并全部导出,点击 save all在导出来的一大堆里发现个 zip,需要输入密码,macOS 一大特点就是,如果是伪加密,随便输入一个密码就可以解压缩,…

Redis7 实现持久化的三种方式

1、概述 1.1、Redis持久化的重要性 数据恢复:Redis是一个内存数据库,如果系统或服务宕机,内存中的数据将会丢失。Redis的持久化机制可以把数据保存到磁盘上,以便在系统重启后恢复数据。这是Redis持久化最基本也是最重要的功能。…

基于阿里云OSS上传图片实战案例

一、案例描述 基于Springboot框架实现一个上传图片到阿里云服务端保存的小案例。 二、准备工作 基于Springboot免费搭载轻量级阿里云OSS数据存储库(将本地文本、照片、视频、音频等上传云服务保存)-CSDN博客 三、代码 新建这两个类:一个…

MySQL:开始深入其数据(四)select子查询

select眼熟吧?(都三节了) 又开始学习了 在 MySQL 中,子查询(subquery)是指在一个查询内嵌套另一个完整的 SELECT 语句。子查询可以嵌套在 SELECT、INSERT、UPDATE、DELETE 语句中,用于从内部查询结果中获取数据,进而完…

新加坡大带宽服务器概览

随着全球互联网的迅猛发展,服务器作为支撑网络应用的重要基础设施,扮演着越来越重要的角色。新加坡,作为亚洲四小龙之一,其服务器市场也备受关注。特别是新加坡的大带宽服务器,更是受到了众多企业和个人的青睐。那么&a…

LeetCode -- 79.单词搜索

1. 问题描述 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中,返回 true ;否则,返回 false 。 单词必须按照字母顺序,通过相邻的单元格内的字母构成,其中“相邻”单元格是那些水…

SNAP:如何批量预处理Sentinel2 L2A数据集并输出为TIFF文件?

我的需求 我目前就是希望下载哨兵2号数据,然后在SNAP中进行批量提取真彩色波段并输出为TIFF文件。 数据集下载说明 目前哨兵网站似乎进行了一大波更新,连网站都换了,网址如下: https://dataspace.copernicus.eu/ 打开后界面如…

Linux课程四课---Linux开发环境的使用(自动化构建工具-make/Makefile的相关)

作者前言 🎂 ✨✨✨✨✨✨🍧🍧🍧🍧🍧🍧🍧🎂 ​🎂 作者介绍: 🎂🎂 🎂 🎉🎉&#x1f389…

C++进阶(三) 二叉搜索树

一、二叉搜索树 1.1 二叉搜索树概念 二叉搜索树又称二叉排序树,它或者是一棵空树,或者是具有以下性质的二叉树: 若它的左子树不为空,则左子树上所有节点的值都小于根节点的值若它的右子树不为空,则右子树上所有节点的值都大于根节…

【论文】A Survey of Monte Carlo Tree Search Methods阅读笔记

本文主要是将有关蒙特卡洛树搜索的文献(2011年之前)进行归纳,概述了核心算法的推导,给出了已经提出的许多变化和改进的一些结构,并总结了MCTS方法已经应用于的博弈和其他领域的结果。 蒙特卡洛树搜索是一种通过在决策…

Java 石头剪刀布小游戏

一、任务 编写一个剪刀石头布游戏的程序。程序启动后会随机生成1~3的随机数,分别代表剪刀、石头和布,玩家通过键盘输入剪刀、石头和布与电脑进行5轮的游戏,赢的次数多的一方为赢家。若五局皆为平局,则最终结果判为平局。 二、实…

深入理解与应用工厂方法模式

文章目录 一、模式概述**二、适用场景****三、模式原理与实现****四、采用工厂方法模式的原因****五、优缺点分析****六、与抽象工厂模式的比较**总结 一、模式概述 ​ 工厂方法模式是一种经典的设计模式,它遵循面向对象的设计原则,特别是“开闭原则”&…

一文扫盲:室内导航系统的应用场景和技术实现(入门级)

hello,我是贝格前端工场,之间搞过一些室内导航项目,有2D也有3D的,算是有些经验,这里给大家分享一下室内导航的基本尝试,欢迎老铁们点赞、关注,如有需求可以私信我们。 一、室内导航是什么 室内…

vs报错1168链接错误——关于:LNK1168 无法打开 E:\VS\文件名\x64\Debug\文件名. 进行写入问题的解决方法

关于这个问题我在网上找了一些方法。 有些方法解决了这个问题, 但是有点麻烦, 有些方法可能不能解决问题。 这里我先把我在网上找到的方法写出来: 第一种方法是可能开着一个程序,就是这个终端。有的时候报错1168是因为你没有关这…

Vue中如何实现动态路由?

在前端开发中,Vue.js 是一个极为流行的 JavaScript 框架,提供了灵活性和易用性,使得开发者可以快速构建单页面应用(SPA)。在 Vue 中,我们经常需要处理动态路由的情况,比如根据用户的操作或者权限…