开始
1:引入mysql-binlog-connector-java.jar
<!-- binlog -->
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.27.1</version>
</dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
2:配置文件
#用户必须要有权限
binlog:
# 服务器地址
host: localhost
port: 3306
username: root
password: 123456
# 监听数据库与表,隔开,格式[库.表,,,]
dbTable: 库.表,库1.表1,库1.表2
serverId: 1
注:1:mysql8.0之后binlog是默认开启的
2:需要一个mysql查看binlog的权限用户
获取配置文件参数 BinLogConfig
/**
* @Description binlog配置
* @Author WangKun
* @Date 2024/8/8 15:01
* @Version
*/
@Data
@Component
public class BinLogConfig {
/**
* mysql服务地址
**/
@Value("${binlog.host}")
private String host;
/**
* mysql数据库端口号
**/
@Value("${binlog.port}")
private int port;
/**
* 查看BinLog权限用户名
**/
@Value("${binlog.username}")
private String username;
/**
* 查看BinLog权限密码
**/
@Value("${binlog.password}")
private String password;
/**
* 库表
**/
@Value("${binlog.dbTable}")
private String dbTable;
/**
* 服务标识
**/
@Value("${binlog.serverId}")
private Integer serverId;
/**
* 获取所有库表,并转化
**/
private List<String> tables;
public List<String> getTables() {
if (StringUtils.hasText(dbTable)){
tables = Arrays.asList(dbTable.split(BinLogUtils.COMMA));
}
return tables;
}
}
BinLog与字段类型实体对象
/**
* @Description Binlog实体对象
* @Author WangKun
* @Date 2024/8/8 16:56
* @Version
*/
@Data
public class BinLog implements Serializable {
/**
* 库表
**/
private String dbTable;
/**
* 事件类型
**/
private EventType eventType;
/**
* 存储字段-之前的值
**/
private Map<String, Serializable> before;
/**
* 存储字段-之后的值
**/
private Map<String, Serializable> after;
/**
* 存储字段--类型
**/
private Map<String, Field> fields;
}
/**
* @Description 字段
* @Author WangKun
* @Date 2024/8/8 16:33
* @Version
*/
@AllArgsConstructor
@Data
public class Field implements Serializable {
/**
* 数据库
**/
public String schema;
/**
* 表
**/
public String table;
/**
* 列索引位置
**/
public int inx;
/**
* 列名
**/
public String colName;
/**
* 类型
**/
public String dataType;
}
BinLog事件类型枚举(新增,修改,删除)
/**
* @Description BinLog事件枚举
* @Author WangKun
* @Date 2024/8/19 15:23
* @Version
*/
@Getter
@AllArgsConstructor
public enum BinLogEventEnum {
WRITE("WRITE"),UPDATE("UPDATE"),DELETE("DELETE");
/**
* 获取key
**/
private final String key;
}
BinLog工具与BinLog数据操作工具
/**
* @Description Binlog工具
* @Author WangKun
* @Date 2024/8/8 17:09
* @Version
*/
@Slf4j
public class BinLogUtils {
/**
* 逗号
**/
public final static String COMMA = ",";
/**
* 点
**/
public final static String POINT = ".";
/**
* 双斜线
**/
public final static String D_SLASH = "\\";
public static final long QUEUE_SLEEP = 1000;
/**
* @param db
* @param table
* @Description 拼接DB与Table
* @Throws
* @Return java.lang.String
* @Date 2024-08-12 16:09:10
* @Author WangKun
**/
public static String getDbTable(String db, String table) {
return db + "-" + table;
}
}
/**
* @Description BinLog数据工具
* @Author WangKun
* @Date 2024/8/12 16:40
* @Version
*/
@Slf4j
public class BinLogDataUtils {
/**
* @param db
* @param table
* @Description 获取columns集合
* @Throws
* @Return java.util.Map<java.lang.String, com.harmonywisdom.binlog.entity.Field>
* @Date 2024-08-12 16:10:08
* @Author WangKun
**/
public static Map<String, Field> getColumnsMap(String db, String table) {
PreparedStatement ps = null;
ResultSet rs = null;
Connection connection = null;
try {
//获取数据源
DataSource dataSource = SpringUtil.getBean(DataSource.class);
connection = dataSource.getConnection();
// 执行sql获取表数据
String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
ps = connection.prepareStatement(preSql);
ps.setString(1, db);
ps.setString(2, table);
rs = ps.executeQuery();
Map<String, Field> map = new HashMap<>(rs.getRow());
while (rs.next()) {
String column = rs.getString("COLUMN_NAME");
int idx = rs.getInt("ORDINAL_POSITION");
if (column != null && idx >= 1) {
// sql的位置从1开始
map.put(column, new Field(rs.getString("TABLE_SCHEMA"), rs.getString("TABLE_NAME"), idx - 1, column, rs.getString("DATA_TYPE")));
}
}
ps.close();
rs.close();
connection.close();
return map;
} catch (SQLException e) {
log.error("加载BinLog监控配置库.表字段错误, db_table={}.{} ", db, table, e);
} finally {
try {
if (ps != null) {
ps.close();
}
if (rs != null) {
rs.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
log.error("加载BinLog监控配置库.表字段错误关闭连接失败, db_table={}.{} ", db, table, e);
}
}
return null;
}
/**
* @param row
* @param dbTable
* @param columMap
* @param eventType
* @Description 新增或删除操作数据格式化
* @Throws
* @Return com.harmonywisdom.binlog.entity.BinLog
* @Date 2024-08-12 16:53:07
* @Author WangKun
**/
private static BinLog insertOrDeletedColum(Serializable[] row, String dbTable, Map<String, Field> columMap, EventType eventType) {
if (null == row || null == columMap || row.length != columMap.size()) {
return null;
}
// 初始化Item
BinLog item = new BinLog();
item.setEventType(eventType);
item.setFields(columMap);
Map<String, Serializable> beOrAf = new HashMap<>();
columMap.forEach((key, colum) -> {
Serializable serializable = row[colum.inx];
if (serializable instanceof byte[]) {
beOrAf.put(key, new String((byte[]) serializable));
} else {
beOrAf.put(key, serializable);
}
});
// 写操作放after,删操作放before
if (isWrite(eventType)) {
item.setAfter(beOrAf);
}
if (isDelete(eventType)) {
item.setBefore(beOrAf);
}
return item;
}
/**
* @param mapEntry
* @param columMap
* @param eventType
* @Description 更新操作数据格式化
* @Throws
* @Return com.harmonywisdom.binlog.entity.BinLog
* @Date 2024-08-12 16:52:46
* @Author WangKun
**/
private static BinLog updateColum(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, Field> columMap, EventType eventType) {
if (null == mapEntry || null == columMap) {
return null;
}
BinLog item = new BinLog();
item.setEventType(eventType);
item.setFields(columMap);
Map<String, Serializable> be = new HashMap<>();
Map<String, Serializable> af = new HashMap<>();
columMap.forEach((key, colum) -> {
Serializable serializableKey = mapEntry.getKey()[colum.inx];
Serializable serializableValue = mapEntry.getValue()[colum.inx];
if (serializableKey instanceof byte[]) {
be.put(key, new String((byte[]) serializableKey));
} else {
be.put(key, serializableKey);
}
if (serializableValue instanceof byte[]) {
af.put(key, new String((byte[]) serializableValue));
} else {
af.put(key, serializableValue);
}
});
item.setBefore(be);
item.setAfter(af);
return item;
}
/**
* @param data
* @param dbTableIdCols
* @param dbTableCols
* @param eventType
* @param queue
* @Description 更新数据
* @Throws
* @Return void
* @Date 2024-08-14 17:35:49
* @Author WangKun
**/
public static void updateData(UpdateRowsEventData data, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, EventType eventType, BlockingQueue<BinLog> queue) {
for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
if (dbTableIdCols.containsKey(data.getTableId())) {
String dbTable = dbTableIdCols.get(data.getTableId());
BinLog item = updateColum(row, dbTableCols.get(dbTable), eventType);
item.setDbTable(dbTable);
try {
queue.put(item);
} catch (InterruptedException e) {
log.error("BinLog 更新数据添加阻塞队列异常:{}", e.getMessage(), e);
}
}
}
}
/**
* @param eventType
* @param rows
* @param tableId
* @param dbTableIdCols
* @param dbTableCols
* @param queue
* @Description 新增与删除数据
* @Throws
* @Return void
* @Date 2024-08-13 17:30:30
* @Author WangKun
**/
public static void insertOrDeletedData(EventType eventType, List<Serializable[]> rows, long tableId, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, BlockingQueue<BinLog> queue) {
for (Serializable[] row : rows) {
if (dbTableIdCols.containsKey(tableId)) {
String dbTable = dbTableIdCols.get(tableId);
BinLog item = insertOrDeletedColum(row, dbTable, dbTableCols.get(dbTable), eventType);
item.setDbTable(dbTable);
try {
queue.put(item);
} catch (InterruptedException e) {
log.error("BinLog 新增或者删除数据添加阻塞队列异常:{}", e.getMessage(), e);
}
}
}
}
}
BinLog监听
/**
* @Description 监听(@FunctionalInterface确保该接口只有以一个抽象方法)
* @Author WangKun
* @Date 2024/8/8 17:31
* @Version
*/
@FunctionalInterface
public interface BinLogListener {
void onEvent(BinLog binLog);
}
/**
* @Description MySQL监听
* @Author WangKun
* @Date 2024/8/8 17:32
* @Version
*/
@Slf4j
public class MySQLBinLogListener implements BinaryLogClient.EventListener {
/**
* BinLog连接信息
**/
private final BinaryLogClient client;
/**
* 阻塞队列,存放信息
**/
private final BlockingQueue<BinLog> queue;
/**
* 线程池
**/
private final ExecutorService executorService;
/**
* 存放每张数据表对应的listener器,允许将多个值存储在单个键下(每张表一个监听器)
**/
private final Multimap<String, BinLogListener> listeners;
/**
* 存放监控所有库表结构
**/
private final Map<String, Map<String, Field>> dbTableCols;
/**
* 存放改变的库表结构
**/
private final Map<Long, String> dbTableIdCols;
/**
* @param conf
* @Description 监听器初始化配置
* @Throws
* @Return
* @Date 2024-08-13 16:53:18
* @Author WangKun
**/
public MySQLBinLogListener(BinLogConfig conf) {
BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword());
EventDeserializer eventDeserializer = new EventDeserializer();
// 序列化
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.setServerId(conf.getServerId());
this.client = client;
this.queue = new ArrayBlockingQueue<>(ThreadPoolConfig.queueCapacity);
this.listeners = ArrayListMultimap.create();
this.dbTableCols = new ConcurrentHashMap<>();
this.dbTableIdCols = new ConcurrentHashMap<>();
// 开启线程池
this.executorService = ThreadPoolUtils.create().setPrefixName("Binlog-Listener-Thread").setCorePoolSize(6).build();
}
/**
* @param event
* @Description 监听处理, 只支持MySQL中BinLog的ROW模式的
* @Throws
* @Return void
* @Date 2024-08-13 16:54:01
* @Author WangKun
**/
@Override
public void onEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
// 装配库表结构
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableData = event.getData();
String dbTable = BinLogUtils.getDbTable(tableData.getDatabase(), tableData.getTable());
if (dbTableCols.containsKey(dbTable)) {
dbTableIdCols.put(tableData.getTableId(), dbTable);
}
}
//新增数据
if (EventType.isWrite(eventType)) {
WriteRowsEventData data = event.getData();
BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);
} else if (EventType.isUpdate(eventType)) {
// 更新数据
UpdateRowsEventData data = event.getData();
BinLogDataUtils.updateData(data, dbTableIdCols, dbTableCols, eventType, queue);
} else if (EventType.isDelete(eventType)) {
// 删除数据
DeleteRowsEventData data = event.getData();
BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);
}
}
/**
* @param db
* @param table
* @param listener
* @Description 注册监听
* @Throws
* @Return void
* @Date 2024-08-13 17:32:44
* @Author WangKun
**/
public void registerListener(String db, String table, BinLogListener listener) {
String dbTable = BinLogUtils.getDbTable(db, table);
// 连接获取字段集合
Map<String, Field> cols = BinLogDataUtils.getColumnsMap(db, table);
// 保存字段信息
dbTableCols.put(dbTable, cols);
// 保存当前注册的listener
listeners.put(dbTable, listener);
}
/**
* @param
* @Description 开启异步多线程消费
* @Throws
* @Return void
* @Date 2024-08-13 18:02:48
* @Author WangKun
**/
@Async
public void openThreadConsumeBinLog(){
client.registerEventListener(this);
for (int i = 0; i < ThreadPoolConfig.corePoolSize*ThreadPoolConfig.CPU_NUMS; i++) {
executorService.execute(() -> {
// 轮询监控
while (true) {
if (!queue.isEmpty()) {
try {
BinLog binLogQueue = queue.take();
listeners.get(binLogQueue.getDbTable()).forEach(binLogListener -> binLogListener.onEvent(binLogQueue));
} catch (InterruptedException e) {
log.error("BinLog多线程消费异常:{}", e.getMessage(), e);
}
}
}
});
}
try {
//连接(不设置时间将会使用主线程)
client.connect(BinLogUtils.QUEUE_SLEEP);
} catch (Exception e) {
log.error("BinLog多线程连接消费异常:{}", e.getMessage(), e);
}
}
}
/**
* @Description 初始化Binlog监听
* @Author WangKun
* @Date 2024/8/9 10:36
* @Version
*/
@Slf4j
@RequiredArgsConstructor
@Component
@Order(value = 1)
public class BinLogInitListener implements CommandLineRunner {
/**
* 资源注入
**/
private final BinLogConfig config;
/**
* @param args
* @Description 初始化
* @Throws
* @Return void
* @Date 2024-08-13 14:07:49
* @Author WangKun
**/
@Override
public void run(String... args) throws Exception {
try {
// 初始化监听器
MySQLBinLogListener mySqlBinLogListener = new MySQLBinLogListener(config);
this.getListMap().forEach((db, tables) -> {
tables.forEach(table -> {
mySqlBinLogListener.registerListener(db, table, info -> {
if(info.getEventType().name().contains(BinLogEventEnum.UPDATE.getKey())){
log.info("库.表: {}, 修改之前:{}" ,db+"."+table,info.getBefore().toString());
log.info("库.表: {}, 修改之后:{}" ,db+"."+table,info.getAfter().toString());
}
if(info.getEventType().name().contains(BinLogEventEnum.WRITE.getKey())){
log.info("库.表: {}, 新增: {}" ,db+"."+table,info.getAfter().toString());
}
if(info.getEventType().name().contains(BinLogEventEnum.DELETE.getKey())){
log.info("库.表: {}, 删除: {}" ,db+"."+table,info.getBefore().toString());
}
});
});
});
// 开启多线程消费
mySqlBinLogListener.openThreadConsumeBinLog();
} catch (Exception e) {
log.error("BinLog初始化监听异常:{}", e.getMessage(), e);
}
}
/**
* @param
* @Description 初始化监听库表
* @Throws
* @Return java.util.Map<java.lang.String, java.util.List < java.lang.String>>
* @Date 2024-08-12 16:19:32
* @Author WangKun
**/
private Map<String, List<String>> getListMap() {
Map<String, List<String>> map = new ConcurrentHashMap<>();
try {
for (String key : config.getTables()) {
// 正则转义,要加双斜线
String[] split = key.split(BinLogUtils.D_SLASH + BinLogUtils.POINT);
if (split.length != 2) {
log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", key);
throw new Exception("BinLog配置同步,类型错误 [库名.表名]。请正确配置:" + key);
}
map.computeIfAbsent(split[0], k -> new ArrayList<>()).add(split[1]);
}
return map;
} catch (Exception e) {
log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", e.getMessage(), e);
}
return map;
}
}
目录结构
启动IDEA,在控制台出现以下信息,成功
2024-08-19 17:40:47.129 INFO 493984 --- [ blc-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to localhost:3306 at log.000004/7294671 (sid:1, cid:800)
效果: