引言
在数据库管理领域,跟踪和记录对数据库所做的更改是至关重要的。这些更改包括插入、更新或删除操作,它们可以影响应用程序的行为、数据一致性和安全性。MySQL 提供了多种机制来追踪这些变化,其中最强大且灵活的一种就是使用二进制日志(Binary Log),简称 binlog。
Binlog概述
从 MySQL 8.0 开始,默认情况下启用了二进制日志功能。而在早期版本如 MySQL 5.* 中,则需要手动开启。Binlog 是一种事务安全的日志文件,它记录了所有对数据库结构和内容进行修改的操作。通过这个日志,我们可以恢复到之前的某个状态,这对于灾难恢复、复制以及审计都是非常有用的工具。
监控数据表变化的意义
数据恢复与备份:当意外发生时,比如误删了一张重要表格,binlog 可以帮助我们回滚到事故发生前的状态。
主从复制:MySQL 的主从架构依赖于 binlog 来同步主服务器上的变更到从服务器。
审计和合规性:企业环境中,了解谁做了什么改动对于满足法规要求非常重要。
性能优化:分析频繁变更的数据可以帮助识别热点数据区域,从而优化索引或调整应用逻辑。
实时数据分析:一些系统利用 binlog 实现实时的数据流处理,例如将变更事件发送给消息队列进行进一步处理。
如何启用Binlog
对于 MySQL 5.* 版本,如果想要开启 binlog,你需要编辑配置文件 my.cnf 或者 my.ini,并在 [mysqld] 部分添加如下配置:
[mysqld]
log-bin=mysql-bin
server-id=1
然后重启 MySQL 服务使设置生效。
示例代码:读取Binlog
以下是一个简单的 java 代码片段,演示了如何使用 mysql-replication 库读取 MySQL 的 binlog 文件。
依赖
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.30.1</version>
</dependency>
代码示例
import cn.com.yeexun.core.exception.BizException;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.sql.*;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
public class BinlogListenerService {
private static final String HOST = "真实ip";
private static final int PORT = 3306;
private static final String USERNAME = "root";
private static final String PASSWORD = "密码";
// 记录表 ID 和表名的映射关系
private final Map<Long, String> tableIdToNameMap = new HashMap<>();
// 监控数据库
private static final String TARGET_DATABASE = "dn_name";
// 监控目标表
private static final String[] TARGET_TABLES = {"table_name1", "table_name2"};
@PostConstruct
public void startListening() {
new Thread(() -> {
try {
BinaryLogClient client = new BinaryLogClient(HOST, PORT, USERNAME, PASSWORD);
client.registerEventListener(this::handleEvent);
client.connect();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private void handleEvent(Event event) {
EventData data = event.getData();
if (data instanceof TableMapEventData) {
// 捕获表信息
TableMapEventData tableMapEventData = (TableMapEventData) data;
String tableName = tableMapEventData.getTable();
String databaseName = tableMapEventData.getDatabase();
// 记录表 ID 和表名映射
tableIdToNameMap.put(tableMapEventData.getTableId(), databaseName + "." + tableName);
} else if (data instanceof WriteRowsEventData) {
WriteRowsEventData eventData = (WriteRowsEventData) data;
processEvent(eventData.getTableId(), "INSERT", eventData);
} else if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData eventData = (UpdateRowsEventData) data;
processEvent(eventData.getTableId(), "UPDATE", eventData);
} else if (data instanceof DeleteRowsEventData) {
DeleteRowsEventData eventData = (DeleteRowsEventData) data;
processEvent(eventData.getTableId(), "DELETE", eventData);
}
}
private void processEvent(long tableId, String eventType, Object eventData) {
// 根据 tableId 获取表名
String fullTableName = tableIdToNameMap.get(tableId);
if (fullTableName == null) {
return; // 未记录的表,忽略
}
String[] parts = fullTableName.split("\\.");
String databaseName = parts[0];
String tableName = parts[1];
// 检查是否是目标库和表
if (TARGET_DATABASE.equals(databaseName) && isTargetTable(tableName)) {
if (eventType.contains("UPDATE")) {
System.out.println("捕获到 " + eventType + " 事件 - 数据库: " + databaseName + ", 表: " + tableName);
System.out.println("事件数据: " + eventData);
String ss = eventData.toString();
System.out.println("表名称" + tableName);
Pattern rowsPattern = Pattern.compile("before=\\[(.*?)\\], after=\\[(.*?)\\]");
Matcher rowsMatcher = rowsPattern.matcher(ss);
while (rowsMatcher.find()) {
String beforeData = rowsMatcher.group(1);
String afterData = rowsMatcher.group(2);
System.out.println("变更前: " + beforeData);
System.out.println("变更后: " + afterData);
List<String> beforeList = Arrays.asList(beforeData.split(",\\s*"));
List<String> afterList = Arrays.asList(afterData.split(",\\s*"));
compareDifferences(beforeList, afterList, tableName);
}
} else {
// // 新增和删除的数据
String ss = eventData.toString();
System.out.println(eventType.contains("INSERT") ? "新增" : "删除");
Pattern rowsPattern = Pattern.compile("rows=\\[(.*?)\\]\\}", Pattern.DOTALL); // 捕获 rows 部分
Matcher rowsMatcher = rowsPattern.matcher(ss);
if (rowsMatcher.find()) {
String rowsContent = rowsMatcher.group(1).trim(); // 提取 rows 的内容
//System.out.println("数据行:\n" + rowsContent);
// 匹配每一行数据
Pattern rowPattern = Pattern.compile("\\[([^\\]]+)]"); // 捕获 [] 中的内容
Matcher rowMatcher = rowPattern.matcher(rowsContent);
while (rowMatcher.find()) {
String rowData = rowMatcher.group(1).trim(); // 提取每一行数据
System.out.println("数据行: " + rowData);
}
} else {
System.out.println("未匹配到 rows 数据!");
}
}
}
}
public static void compareDifferences(List<String> before, List<String> after, String tableName) {
List<String> columnNamesFromDatabase = getColumnNamesFromDatabase(tableName);
for (int i = 0; i < before.size(); i++) {
String beforeValue = before.get(i);
String afterValue = after.get(i);
if (!Objects.equals(beforeValue, afterValue)) {
System.out.printf("列名 %s: Before = %s, After = %s%n", columnNamesFromDatabase.get(i), beforeValue, afterValue);
}
}
}
private boolean isTargetTable(String tableName) {
for (String targetTable : TARGET_TABLES) {
if (targetTable.equals(tableName)) {
return true;
}
}
return false;
}
public static List<String> getColumnNamesFromDatabase(String tableName) {
List<String> columnNames = new ArrayList<>();
String url = "jdbc:mysql://" + HOST + ":" + PORT + "/" + TARGET_DATABASE;
String username = USERNAME;
String password = PASSWORD;
try (Connection connection = DriverManager.getConnection(url, username, password)) {
DatabaseMetaData metaData = connection.getMetaData();
ResultSet columns = metaData.getColumns(null, null, tableName, null);
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
columnNames.add(columnName);
}
} catch (SQLException e) {
e.printStackTrace();
throw new BizException("获取数据表元数据信息出错");
}
return columnNames;
}
}
这段代码会持续监听 MySQL 的 binlog,并打印出所有的删除、更新和插入事件。请注意,这只是一个基础的例子,在生产环境中使用时,可能还需要考虑更多因素,如错误处理、多线程支持等。