canal 环境搭建和配置
安装依赖环境
安装canal服务端
canal客户端配置
安装依赖环境
- 下载Linux版jdk 链接:百度网盘 请输入提取码 提取码:5r2e --来自百度网盘超级会员V5的分享
- 上传到 /soft/java目录下,并解压-执行如下命令
tar -zxvf jdk-8u211-linux-x64.tar.gz
- 配置环境变量 vi /etc/profile 添加以下内容
export JAVA_HOME=/soft/java/jdk1.8.0_211 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar`- 环境变量生效
source /etc/profile
Java -version
验证出现版本号即安装完成安装mysql
参见 : Linux 环境安装MySQL5.7
安装canal服务端
参照“https://blog.csdn.net/imVainiycos/article/details/122077960”
-
下载地址 Release v1.1.5 · alibaba/canal · GitHub
-
下载developyer版本,并上传到Linux环境
-
解压到opt目录下
tar -xzvf canal.deployer-1.1.5.tar.gz -C /opt/mycanal/
-
复制example 库
cp -r ./conf/example ./conf/mycanal
-
修改mycanal库只监听 test表
vi ./conf/mycanal/instance.properties
canal.instance.filter.regex=mycanal.test
-
在bin目录下启动
./bin/start.sh
-
查看canal运行日志
cat logs/canal/canal.log
2023-05-05 17:02:03.989 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.91.13(192.168.91.13):11111] 2023-05-05 17:02:05.507 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ...... 2023-05-05 17:02:05.949 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start mycanal successful.
-
the canal server is running now
表示服务端已经启动成功 -
查看instance实例日志
cat logs/example/example.log cat logs/mycanal/mycanal.log
-
执行以下内容,根据报错不同
[在数据库中创建一个root@127.0.0.1且使用mysql_native_password加密方式的用户,后面显示canal@localhost没有权限,同样创建一个;]
# 创建一个root用户名在127.0.0.1下的账号,密码设置为root CREATE USER 'root'@'127.0.0.1' IDENTIFIED BY 'root'; CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal'; # 调整密码策略 ALTER USER 'root'@'127.0.0.1' IDENTIFIED WITH mysql_native_password BY 'root'; ALTER USER 'canal'@'localhost' IDENTIFIED WITH mysql_native_password BY 'canal'; # 授权用户 GRANT ALL PRIVILEGES ON *.* to 'root'@'127.0.0.1'; GRANT ALL PRIVILEGES ON *.* to 'canal'@'localhost'; # 刷新系统权限表 flush privileges;
-
重启服务,执行第9步,没有报错,跳转到example和mycanal库--出现图中两个文件即服务端安装成功
-
服务端完成
canal客户端配置
首先安装一个本地meaven
-
检查JAVA_HOME环境变量。Maven是使用Java开发的,所以必须知道当前系统环境中JDK的安装目录。
echo %JAVA_HOME%
C:\ZA\zjf\hana\jdk1.8.0_311\
-
解压Maven的核心程序。(路径不含中文)
C:\ZA\peizhi\maven\apache-maven-3.9.1
-
配置环境变量
m2_home:C:\ZA\peizhi\maven\apache-maven-3.9.1
path %m2_home%/bin
-
查看Maven版本信息验证安装是否正确
mvn -v
配置maven仓库地址,保证仓库的jar包的更新
在Idea中配置Maven
-
设置maven的安装目录及本地仓库
文件->设置。。。如下图配置
-
创建一个meavn程序
新建-新模块-选mmeavn如下图,命名按照需求命名,
目录选择默认本地
高级设置为具体坐标
-
修改pom文件
添加properties 标签内容
<properties> <fastjson.version>1.2.79</fastjson.version> <java.version>1.8</java.version> <spring-cloud.version>2020.0.1</spring-cloud.version> <spring-cloud-alibaba-version>2021.0.1.0</spring-cloud-alibaba-version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
添加dependency标签内容 在<dependencies></dependencies>内填写(不加其他标签,若加了之后会报错)
<!-- canal --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58.sec06</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.5</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> </dependency>
-
编写程序
在main 路径下创建java类
-
输入如下内容
package org.example; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import java.net.InetSocketAddress; import java.util.List; /** * @Description canal客户端 * @author xiao tang * @version 1.0.0 * @createTime 2022年09月17日 */ public class myCanal { /** * 连接ip */ private static final String IP = "192.168.91.13"; /** * 连接端口号 */ private static final Integer PORT = 11111; /** * 连接canal通道 */ private static final String DESTINATION = "mycanal"; /** * 批次最大数量 */ private final static int BATCH_SIZE = 1000; public static void main(String[] args) throws Exception { // 获取canal服务的连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.91.13", 11111), "mycanal", "", ""); // 尝试读取服务端是否有新数据 while (true) { // 连接 canalConnector.connect(); // 订阅数据库,监控数据库 trcanal所有表 canalConnector.subscribe("trcanal.*"); // 获取数据,每次获取100条 Message message = canalConnector.get(100); // 获取 Entry 集合 List<CanalEntry.Entry> entries = message.getEntries(); // 判断集合是否为空,如果为空,则等待继续拉取 if (entries == null || entries.isEmpty()) { System.out.println("没有数据,休息3s"); Thread.sleep(5000); continue; } // 遍历 entries 单条解析 for (CanalEntry.Entry entry : entries) { // 获取表名 String tableName = entry.getHeader().getTableName(); // 获取类型 CanalEntry.EntryType entryType = entry.getEntryType(); // 获取序列化后的数据 ByteString storeValue = entry.getStoreValue(); // 判断当前entryType类型是是否为 RowData 类型 if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // 反序列化数据 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 获取当前事件的操作类型 CanalEntry.EventType eventType = rowChange.getEventType(); // 获取数据集 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // 遍历并打印数据集 for (CanalEntry.RowData rowData : rowDatasList) { // 获取修改前的数据 JSONObject beforeData = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } // 获取修改后的数据 JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } // 打印 System.out.println("table = " + tableName + ", eventType=" + eventType + " before= " + beforeData + "after " + afterData); } } } } } }
-
有其他的功能需求可在这上面添加
-
重启服务端canal即可正常运行
-