使用 mysql-binlog-connector-java
1. mysql-binlog-connector-java 官网
2. Java代码中,如何监控Mysql的binlog?
前置条件
1. mysql服务器表结构
CREATE TABLE `student` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`age` int NOT NULL,
`code` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_name` (`name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
2. 开启master的mysql 服务器的log_bin
show variables like 'log_bin';
如果没有,那么设置文件中增加配置
log_bin=mysql-bin
binlog-format=ROW
server-id=1
重启服务
- 在配置文件中加入了log_bin配置项后,表示启用了binlog
- binlog-format是binlog的日志格式,支持三种类型,分别是STATEMENT、ROW、MIXED,我们在这里使用ROW模式
- server-id用于标识一个sql语句是从哪一个server写入的,这里一定要进行设置,否则我们在后面的代码中会无法正常监听到事件
引入maven依赖
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.25.0</version>
</dependency>
代码块实现监听
查看一个简单的case
public static void main(String[] args) {
// 这里的账号必须要有权限访问
BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "root", "root");
// 反序列化配置
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
// EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
// 设置反序列化配置
client.setEventDeserializer(eventDeserializer);
// 设置自己的client作为服务器的id
client.setServerId(3);
// 可选,设置start fileName+position
// client.setBinlogFilename("master-bin.000080");
// client.setBinlogPosition(219);
client.registerEventListener(event -> {
EventData data = event.getData();
String tableName;
if (data instanceof TableMapEventData) {
System.out.println("Table:");
TableMapEventData tableMapEventData = (TableMapEventData) data;
System.out.println(tableMapEventData.getTableId() + ": [" + tableMapEventData.getDatabase() + "." + tableMapEventData.getTable() + "]");
tableName = tableMapEventData.getTable();
// 如果是不处理的表,那么返回
if (!Objects.equals(tableName, "student"))
return;
}
if (data instanceof UpdateRowsEventData) {
// System.out.println("Update:");
// System.out.println(data);
// 获取对应的操作对象的json化数据
UpdateRowsEventData udata = (UpdateRowsEventData) data;
List<Map.Entry<Serializable[], Serializable[]>> rows = udata.getRows();
for (Map.Entry<Serializable[], Serializable[]> row : rows) {
List<Serializable> entries = Arrays.asList(row.getValue());
JSONObject dataObject = getDataObject(entries);
System.out.println(dataObject);
}
} else if (data instanceof WriteRowsEventData) {
WriteRowsEventData wData = new WriteRowsEventData();
wData.getIncludedColumns();
wData.getRows();
System.out.println("Insert:");
System.out.println(data);
} else if (data instanceof DeleteRowsEventData) {
System.out.println("Delete:");
System.out.println(data);
}
});
try {
client.connect();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 根据message获取对象
*/
private static JSONObject getDataObject(List<Serializable> message) {
JSONObject resultObject = new JSONObject();
String format = "{\"id\":\"0\",\"name\":\"1\",\"age\":\"2\",\"code\":\"3\"}";
JSONObject json = JSON.parseObject(format);
for (String key : json.keySet()) {
resultObject.put(key, message.get(json.getInteger(key)));
}
return resultObject;
}
首先,创建一个BinaryLogClient客户端对象,初始化时需要传入mysql的连接信息,创建完成后,给客户端注册一个监听器,来实现它对binlog的监听和解析。在监听器中,我们暂时只对4种类型的事件数据进行了处理,除了WriteRowsEventData、DeleteRowsEventData、UpdateRowsEventData对应增删改操作类型的事件数据外,还有一个TableMapEventData类型的数据,包含了表的对应关系,在后面的例子中再具体说明。
在这里,客户端监听到的是数据库级别的所有事件,并且可以监听到表的DML语句和DDL语句,所以我们只需要处理我们关心的事件数据就行,否则会收到大量的冗余数据。
启动程序,控制台输出:
我们需要知道表的信息或者的行信息,在反序列化的过程中,我们调用UpdateRowsEventData中只有行的index,而没有name,那么这个时候需要我们自己定义每列对应的续好了,然后解析获取到的row的信息。
方法如:
private static JSONObject getDataObject(List<Serializable> message) {
JSONObject resultObject = new JSONObject();
String format = "{\"id\":\"0\",\"name\":\"1\",\"age\":\"2\",\"code\":\"3\"}";
JSONObject json = JSON.parseObject(format);
for (String key : json.keySet()) {
resultObject.put(key, message.get(json.getInteger(key)));
}
return resultObject;
}
表和列信息模板
在getDataObject中,我们需要知道每个cloumnId对应的name,那么,我们需要自定义一个format,但是columnId和index的关系我们可以从information_schema中获取,当然,这就需要处理了,这里另说。
指定binlog的起始位置
By default, BinaryLogClient starts from the current (at the time of connect) master binlog position. If you wish to kick off from a specific filename or position, use client.setBinlogFilename(filename) + client.setBinlogPosition(position).
通常来说,监听从启动开始,但是也可以指定fileName+position
// 可选,设置start fileName+position,后续的话会都会开始执行的,包括后续的binlog file
client.setBinlogFilename("master-bin.000079");
client.setBinlogPosition(4);
查找binLog中的信息
参考:MySQL Binlog二进制日志基于position位置点恢复数据
如果无法确定启动的bin文件和postion,可以查看相关的binlog 文件,
查找binlog的文件名
查看master-server的配置文件,查看目录和文件前缀 mysql.ini
在目录下查找对应的binLog文件
查看binlong文件中的信息
show binlog events in 'master-bin.000080';
总结
1. git代码