文章目录
- 前言
- 1. 案例模拟
- 1.1 确认信息
- 1.2 下载 Binlog
- 1.3 准备环境
- 1.4 注册 Binlog
- 1.5 准备结构信息
- 1.6 Python 订阅
- 1.7 输出结果展示
- 2. 原理解析
- 2.1 程序设计
- 2.2 模块版本
- 总结
前言
最近有研发同学问我:有一个问题,想查一个 ID 为 xxxx 的 sku 什么时候被更新了吗?更新前的数据是什么?一般这么讲,可能是由于手动执行或者是代码 Bug 导致出现数据丢失或者数据误更新,需要确认订正,一般需要通过分析 Binlog 来解决,本篇文章将通过该案例介绍此类问题的处理思路。
1. 案例模拟
1.1 确认信息
当有需求需要从 Binlog 中查询变更记录或者需要闪回数据的时候,需要和研发确认 时间范围、涉及到的 环境信息、库名、表名 最好是可以提供 SQL 语句。在上述的案例中,研发提供的是 sku 的 ID 时间范围是 2024-02-22 18:01:42 ~ 18:03:42。
1.2 下载 Binlog
阿里云、腾讯云、华为云 的数据库服务 Binlog 都是支持直接下载的,按照研发提供的时间区间下载对应的 Binlog 日志。
如果是本地自建的 MySQL 数据库,是没用日志开始时间和日志结束时间的,需要先确认下时间。可参考下方文档。
推荐阅读:MySQL 查询 Binlog 生成时间
1.3 准备环境
Binlog 下载完成后,那我们想要的数据记录也在里面,接下来将介绍如何获得我们想要的记录,首先需要准备一台测试数据库(作为数据库管理人员,随身带一套 MySQL 测试环境不过分吧?)测试数据库的大版本需要和生产环境的版本大版本一致。
以下是我环境信息:
- 生产环境 MySQL 5.7.18
- 测试环境 MySQL 5.7.33 (单实例)
1.4 注册 Binlog
该步骤,需要把从云上下载的 Binlog 注册到我们的测试环境中,首先需要先清空测试环境中的 Binlog 日志。
reset master;
查询 Binlog 索引文件的位置:
show variables like 'log_bin_index';
将我们从生产环境下载的 Binlog 拷贝到测试环境 Binlog 目录,然后再按照 mysql-bin.index 文件中的格式,将 Binlog 写进去。
/data/mysql_57/logs/mysql-bin.000001
/data/mysql_57/logs/mysql-bin.000002
/data/mysql_57/logs/mysql-bin.000003
/data/mysql_57/logs/mysql-bin.000004
/data/mysql_57/logs/mysql-bin.000005
上面,是注册完成的 Binlog 索引文件信息,生产环境下载了 5 个 Binlog 他们分别是 008213、008214、008215、008216、008217,拷贝到测试环境后,我们将原来 Binlog 名字修改为从 000001 开始,并且是顺序的。注意给拷贝来的 Binlog 设置用户属组。
chown -R mysql:mysql mysql-bin.*
设置完成后,重启测试环境的数据库,注册阶段完成。
1.5 准备结构信息
该步骤,需要把生产环境的表结构 copy 一份到测试环境。不用全部 copy 只 copy 需要查询记录的表。例如上面的 case 我们要查 product 库下的 sku 表。就在测试环境创建一个 product 库,然后将生产环境 sku 的表结构 copy 到测试环境。
create database product;
use product;
-- 不在此展示完成结构了,与生产环境保持一致就行
create table sku(.........)
create table sku_price(.........)
需要查询到记录涉及到多少张表,那么就 copy 多少张表就行。这次案例涉及到 2 张表。
1.6 Python 订阅
该步骤,要从 5 个 Binlog 文件中搜索到我们想要的记录,一个 Binlog 中可能有几十万个事务,这里我们通过编写 Python 脚本简化操作。我们要搜索的是 product 库下 sku、sku_price 表 sku_id = 810827 的变更记录,只需要按照下方代码注释修改即可。
在准备结构信息的步骤中,我们只在注册服务器中创建了需要的表,就起到了过滤表的作用,所以代码中不需要指定表名。
# -*- coding: utf-8 -*-
import sys
from datetime import datetime
from decimal import Decimal
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
)
from pymysqlreplication.event import XidEvent, QueryEvent
# 填写注册 MySQL 连接信息
mysql_settings = {
'host': '172.16.104.56',
'port': 3306,
'user': 'bing',
'password': 'abc123'
}
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=8023,
log_file='mysql-bin.000001', # 从哪个 Binlog 开始扫描
log_pos=4,
only_schemas='product', # 数据库名称
only_events=[
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
]
)
def simple_data_type(data_info: dict):
"""
直接打印结果会包含一些对象信息,在这里简化处理
"""
tem_data = {}
for key, value in data_info.items():
if isinstance(value, Decimal):
tem_data[key] = float(value)
elif isinstance(value, datetime):
tem_data[key] = value.strftime('%Y-%m-%d %H:%M:%S')
else:
tem_data[key] = value
return tem_data
search_file_name = None
for binlog_event in stream:
if search_file_name == stream.log_file:
pass
elif search_file_name != stream.log_file:
search_file_name = stream.log_file
print('正在扫描:', search_file_name)
for row in binlog_event.rows:
try:
event_time = datetime.fromtimestamp(binlog_event.timestamp)
except OSError:
event_time = datetime(1980, 1, 1, 0, 0)
if isinstance(binlog_event, DeleteRowsEvent):
df = row["values"]
# 这里条件,需要自己改
if int(df['sku_id']) == 810887:
print('-' * 160)
print('操作类型: DELETE')
print('时间: ', event_time)
print('日志文件: ', stream.log_file)
print('数据库名:', binlog_event.schema)
print('表名:', binlog_event.table)
print('Position: ', binlog_event.packet.log_pos)
print(simple_data_type(df))
print('-' * 160)
elif isinstance(binlog_event, UpdateRowsEvent):
df = row["before_values"]
# 这里条件,需要自己改
if int(df['sku_id']) == 810827:
print('-' * 160)
print('操作类型: UPDATE')
print('时间: ', event_time)
print('日志文件: ', stream.log_file)
print('数据库名:', binlog_event.schema)
print('表名:', binlog_event.table)
print('Position: ', binlog_event.packet.log_pos)
print('before_values: ', simple_data_type(row["before_values"]))
print('after_values: ', simple_data_type(row["after_values"]))
elif isinstance(binlog_event, WriteRowsEvent):
df = row["values"]
# 这里条件,需要自己改
if int(df['sku_id']) == 810827:
print('-' * 160)
print('操作类型: INSERT')
print('时间: ', event_time)
print('日志文件: ', stream.log_file)
print('数据库名:', binlog_event.schema)
print('表名:', binlog_event.table)
print('Position: ', binlog_event.packet.log_pos)
print(simple_data_type(df))
1.7 输出结果展示
结果已脱敏,可以看出 boutique_price 从原来的 1058.46 被修改为 1614.0,需要注意的是 Binlog 中的 Event 只能精确到秒。
操作类型: UPDATE
时间: 2024-02-22 18:02:42
日志文件: mysql-bin.000003
数据库名: product
表名: sku
Position: 65716973
before_values: {'sku_id': 810887, 'product_id': 26492, 'sku_code': '000', 'name': '', 'coverpic': '', 'introduction': '', 'in_price': 132.31, 'price': 361.1, 'created_at': '2022-11-18 13:37:48', 'updated_at': '2024-02-21 04:10:41', 'enabled': '1', 'retail_price': None, 'im_price': 150.0, 'last_check': '2022-11-18 13:37:48', 'size': 'UNI', 'boutique_price': 1058.46}
after_values: {'sku_id': 810887, 'product_id': 26492, 'sku_code': '000', 'name': '', 'coverpic': '', 'introduction': '', 'in_price': 132.31, 'price': 361.1, 'created_at': '2022-11-18 13:37:48', 'updated_at': '2024-02-22 18:02:42', 'enabled': '1', 'retail_price': None, 'im_price': 150.0, 'last_check': '2022-11-18 13:37:48', 'size': 'UNI', 'boutique_price': 1614.0}
将结果交给研发,任务就算完成了。
2. 原理解析
2.1 程序设计
这里用到了一个模块 pymysqlreplication 它可以伪装成一个 IO 复制线程,从 MySQL 服务器中拉取 Binlog Event 并支持解析。
为什么直接解析 Binlog?因为 Binlog 中没用表字段名信息,直接解析比较难做一些过滤操作。先将表结构和 Binlog 注册到一台测试 MySQL 服务器,然后通过伪装 IO 复制线程拉取 Event 过滤找到我们想要的记录。
2.2 模块版本
模块代码库:python-mysql-replication
# 本次实验使用的版本
mysql-replication==0.13
安装方法:
pip3 install mysql-replication
总结
本篇文章介绍了如何从 Binlog 中定位记录,需要有一点 Python 基础,但注册 Binlog 思路可应用多个场景,例如使用它恢复增量日志等。得到记录结果后,如果要回滚,那么可以依靠上面的字典中的信息,翻译成 SQL 语句即可,目前程序还没有实现。