目前市面上有许多的 CDC(Change Data Capture) 框架用于监听数据库的数据变动,例如:canal、Debezium、Maxwell等都是用来解析 binlog 日志实现事件的监听。但是有一个情况就是如果公司对 binlog 日志文件的权限管控的很严格,那么这些用于监听的工具就可能因为权限的问题无法使用。这里我尝试使用 mysql 的 UDF + 触发器 的方式来实现数据库层面的数据监听,然后调用本地的 java程序
1. 扩展安装
要想调用本地的java代码需要用到mysql的扩展程序 lib_mysqludf_sys
官网地址 https://github.com/mysqludf/lib_mysqludf_sys
linux平台需要将 .so 的文件复制到mysql的插件路径下面 /usr/lib/mysql/plugin/
windows平台需要将 .dll 路径安装到 mysql安装路径\lib\plugin下面,由于官网没有提供 .dll 文件,这里可以用我生成的
链接:https://pan.baidu.com/s/13RJK6wfVeHcCJkuC-BigAw?pwd=1234
提取码:1234
2. 链接程序
上面安装完成之后,需要将函数链接到mysql中
CREATE FUNCTION lib_mysqludf_sys_info RETURNS string SONAME 'lib_mysqludf_sys.dll';
CREATE FUNCTION sys_get RETURNS string SONAME 'lib_mysqludf_sys.dll';
CREATE FUNCTION sys_set RETURNS int SONAME 'lib_mysqludf_sys.dll';
CREATE FUNCTION sys_exec RETURNS int SONAME 'lib_mysqludf_sys.dll';
CREATE FUNCTION sys_eval RETURNS string SONAME 'lib_mysqludf_sys.dll';
3. 创建例子程序
这里写一个例子程序,将mysql中监听到的数据追加到文件当中,maven打成jar包
public class Application {
public static void main(String[] args) {
for (String arg : args) {
System.out.println(hello(arg));
}
}
public static String hello(String args) {
try {
File file = new File("E:\\data.txt");
FileWriter writer = new FileWriter(file, true);
writer.write(args);
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
return args;
}
}
4. 创建存储过程
DELIMITER //
CREATE PROCEDURE execute_jar(IN data JSON)
BEGIN
DECLARE cmd VARCHAR(255);
SET cmd = CONCAT('java -jar E:/my-study-project/redis/target/redis-1.0.0-SNAPSHOT.jar "' , data , '"');
SELECT sys_eval(cmd) into @select;
END //
DELIMITER ;
5. 创建触发器
CREATE TRIGGER t_order_trigger AFTER INSERT ON t_order
FOR EACH ROW
BEGIN
-- 获取新插入的行数据并整合为一个 JSON 字符串
DECLARE jsonData JSON;
SET jsonData = JSON_OBJECT(
'id', NEW.id,
'userName', NEW.user_name,
'orderId', NEW.order_id
);
-- 调用存储过程并传递整合的 JSON 字符串
CALL execute_jar(jsonData);
END;
6. 测试表
create table t_order
(
id bigint auto_increment comment '主键id'
primary key,
user_name varchar(64) null comment '用户名称',
order_id bigint(64) null comment '订单id'
)
comment '订单表' collate = utf8mb4_general_ci;
7. 测试数据
insert into t_order(user_name, order_id) values ('zs', 1), ('ls', 2);
可以看到数据直接追加到文件中了,利用这个特性我们就可以来监听mysql的数据变动,然后投递到kafka或者rocketmq中去了