canal 环境搭建和配置

news2024/11/15 9:25:55

canal 环境搭建和配置

安装依赖环境

安装canal服务端

canal客户端配置

安装依赖环境

  • 下载Linux版jdk 链接:百度网盘 请输入提取码 提取码:5r2e --来自百度网盘超级会员V5的分享
  • 上传到 /soft/java目录下,并解压-执行如下命令 tar -zxvf jdk-8u211-linux-x64.tar.gz
  • 配置环境变量 vi /etc/profile 添加以下内容
    export JAVA_HOME=/soft/java/jdk1.8.0_211
    export PATH=$JAVA_HOME/bin:$PATH
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar`

  • 环境变量生效 source /etc/profile Java -version 验证出现版本号即安装完成
  • 安装mysql

    参见 : Linux 环境安装MySQL5.7

安装canal服务端

参照“https://blog.csdn.net/imVainiycos/article/details/122077960”

  1. 下载地址 Release v1.1.5 · alibaba/canal · GitHub

     

  2. 下载developyer版本,并上传到Linux环境

  3. 解压到opt目录下

    tar -xzvf canal.deployer-1.1.5.tar.gz -C /opt/mycanal/

  4. 复制example 库

    cp -r ./conf/example ./conf/mycanal

  5. 修改mycanal库只监听 test表

    vi ./conf/mycanal/instance.properties

    canal.instance.filter.regex=mycanal.test

  6. 在bin目录下启动

    ./bin/start.sh

  7. 查看canal运行日志 cat logs/canal/canal.log

    2023-05-05 17:02:03.989 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.91.13(192.168.91.13):11111] 2023-05-05 17:02:05.507 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ...... 2023-05-05 17:02:05.949 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start mycanal successful.

  8. the canal server is running now 表示服务端已经启动成功

  9. 查看instance实例日志

    cat logs/example/example.log cat logs/mycanal/mycanal.log

     

  10. 执行以下内容,根据报错不同

    [在数据库中创建一个root@127.0.0.1且使用mysql_native_password加密方式的用户,后面显示canal@localhost没有权限,同样创建一个;]  

    # 创建一个root用户名在127.0.0.1下的账号,密码设置为root
    CREATE USER 'root'@'127.0.0.1' IDENTIFIED BY 'root';
    CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
    # 调整密码策略
    ALTER USER 'root'@'127.0.0.1' IDENTIFIED WITH mysql_native_password BY 'root';
    ALTER USER 'canal'@'localhost' IDENTIFIED WITH mysql_native_password BY 'canal';
    # 授权用户
    GRANT ALL PRIVILEGES ON *.* to 'root'@'127.0.0.1';
    GRANT ALL PRIVILEGES ON *.* to 'canal'@'localhost';
    # 刷新系统权限表
    flush privileges;
    ​
  11. 重启服务,执行第9步,没有报错,跳转到example和mycanal库--出现图中两个文件即服务端安装成功

     

  12. 服务端完成

canal客户端配置

首先安装一个本地meaven

  1. 检查JAVA_HOME环境变量。Maven是使用Java开发的,所以必须知道当前系统环境中JDK的安装目录。

    echo %JAVA_HOME%

    C:\ZA\zjf\hana\jdk1.8.0_311\

  2. 解压Maven的核心程序。(路径不含中文)

    C:\ZA\peizhi\maven\apache-maven-3.9.1

  3. 配置环境变量

    m2_home:C:\ZA\peizhi\maven\apache-maven-3.9.1

    path %m2_home%/bin

  4. 查看Maven版本信息验证安装是否正确

    mvn -v

     

    配置maven仓库地址,保证仓库的jar包的更新

    在Idea中配置Maven

    1. 设置maven的安装目录及本地仓库

      文件->设置。。。如下图配置

       

       

    2. 创建一个meavn程序

      新建-新模块-选mmeavn如下图,命名按照需求命名,

      目录选择默认本地

      高级设置为具体坐标

    3. 修改pom文件

      添加properties 标签内容

      <properties>
        <fastjson.version>1.2.79</fastjson.version>
        <java.version>1.8</java.version>
        <spring-cloud.version>2020.0.1</spring-cloud.version>
        <spring-cloud-alibaba-version>2021.0.1.0</spring-cloud-alibaba-version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

      添加dependency标签内容 在<dependencies></dependencies>内填写(不加其他标签,若加了之后会报错)

       

      <!-- canal -->
      <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.58.sec06</version>
      </dependency>
      <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
      </dependency>
      <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.1.5</version>
      </dependency>
      <!-- lombok -->
      <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.6</version>
      </dependency>
    4. 编写程序

      在main 路径下创建java类

       

    5. 输入如下内容

      package org.example;
      ​
      import com.alibaba.fastjson.JSONObject;
      import com.alibaba.otter.canal.client.CanalConnector;
      import com.alibaba.otter.canal.client.CanalConnectors;
      import com.alibaba.otter.canal.protocol.CanalEntry;
      import com.alibaba.otter.canal.protocol.Message;
      import com.google.protobuf.ByteString;
      ​
      import java.net.InetSocketAddress;
      import java.util.List;
      /**
       * @Description canal客户端
       * @author xiao tang
       * @version 1.0.0
       * @createTime 2022年09月17日
       */
      public class myCanal {
      ​
          /**
           * 连接ip
           */
          private static final String IP = "192.168.91.13";
      ​
          /**
           * 连接端口号
           */
          private static final Integer PORT = 11111;
      ​
          /**
           * 连接canal通道
           */
          private static final String DESTINATION = "mycanal";
      ​
          /**
           * 批次最大数量
           */
          private final static int BATCH_SIZE = 1000;
      ​
          public static void main(String[] args) throws Exception {
              // 获取canal服务的连接
              CanalConnector canalConnector =
                      CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.91.13", 11111), "mycanal", "", "");
              // 尝试读取服务端是否有新数据
              while (true) {
                  // 连接
                  canalConnector.connect();
                  // 订阅数据库,监控数据库 trcanal所有表
                  canalConnector.subscribe("trcanal.*");
                  // 获取数据,每次获取100条
                  Message message = canalConnector.get(100);
                  // 获取 Entry 集合
                  List<CanalEntry.Entry> entries = message.getEntries();
                  // 判断集合是否为空,如果为空,则等待继续拉取
                  if (entries == null || entries.isEmpty()) {
                      System.out.println("没有数据,休息3s");
                      Thread.sleep(5000);
                      continue;
                  }
                  // 遍历 entries 单条解析
                  for (CanalEntry.Entry entry : entries) {
                      // 获取表名
                      String tableName = entry.getHeader().getTableName();
                      // 获取类型
                      CanalEntry.EntryType entryType = entry.getEntryType();
                      // 获取序列化后的数据
                      ByteString storeValue = entry.getStoreValue();
                      // 判断当前entryType类型是是否为 RowData 类型
                      if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                          // 反序列化数据
                          CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                          // 获取当前事件的操作类型
                          CanalEntry.EventType eventType = rowChange.getEventType();
                          // 获取数据集
                          List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                          // 遍历并打印数据集
                          for (CanalEntry.RowData rowData : rowDatasList) {
                              // 获取修改前的数据
                              JSONObject beforeData = new JSONObject();
                              List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                              for (CanalEntry.Column column : beforeColumnsList) {
                                  beforeData.put(column.getName(), column.getValue());
                              }
                              // 获取修改后的数据
                              JSONObject afterData = new JSONObject();
                              List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                              for (CanalEntry.Column column : afterColumnsList) {
                                  afterData.put(column.getName(), column.getValue());
                              }
                              // 打印
                              System.out.println("table = " + tableName + ", eventType=" + eventType + " before= " + beforeData + "after " + afterData);
                          }
                      }
                  }
              }
          }
      }
    6. 有其他的功能需求可在这上面添加

    7. 重启服务端canal即可正常运行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/642937.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Java在线医疗服务系统设计与实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a; ✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精…

Media3:Android下一代媒体框架

无论您是在构建音乐播放器、视频流应用程序还是其他需要播放媒体内容的 Android 应用程序&#xff0c;拥有可靠的媒体播放库都是必不可少的。 这就是 Media3 发挥作用的地方。 什么是 Media3&#xff1f; Media3 是由 Google 作为 AndroidX 的一部分推出的强大媒体播放库。它…

从零开始 Spring Boot 38:Lombok 与依赖注入

从零开始 Spring Boot 38&#xff1a;Lombok 与依赖注入 图源&#xff1a;简书 (jianshu.com) 在之前的文章中&#xff0c;我详细介绍了 Lombok 的用法&#xff0c;考虑到在 Spring 中使用依赖注入&#xff08;DI&#xff09;是如此的频繁&#xff0c;因此有必要讨论使用 Lomb…

精通postman教程(五)postman请求参数化

作为一名测试工程师&#xff0c;那么Postman绝对是大伙必备的工具之一。 在这个系列教程中&#xff0c;我将为大伙详细讲解如何使用Postman进行API测试。 今天我带大伙实战一番postman如何请求参数化 &#xff0c;让你们快速上手这款工具。 请求参数化 数据参数化是Postman…

Kivy系列(一)—— Kivy buildozer的Docker镜像制作

接触Kivy是奔着使用python便捷又是跨平台工具去的&#xff0c;如此一套代码可以发布为各类平台的成果。但是由于网络环境限制&#xff0c;以及kivy工具链上各类工具的频繁迭代&#xff0c;即使按照github上的kivy buildozer官方文档&#xff0c;也很难打包成功&#xff0c;kivy…

调试笔记-stm32的OTA/IAP 通过485升级固件

背景&#xff1a;最近需要在stm32上实现通过rs485升级固件功能。经过几天搜索和调试&#xff0c;实现了功能。 目标&#xff1a;使用cubeIDE实现stm32F407VGT6&#xff0c;通过RS485升级固件 调试记录&#xff1a; 步骤1. 在keil环境下的rs485升级固件(含源码)&#xff1a;S…

react 18.2 官网学习笔记(1)

useMemo const cachedValue useMemo(calculateValue, dependencies)&#xff1b;参数一&#xff1a;计算要缓存的值的函数。它应该是纯的&#xff0c;不应该接受任何参数&#xff0c;并且应该返回任何类型的值。React会在初始渲染时调用你的函数。在下一次渲染时&#xff0c;…

从搭建hadoop开始学习大数据中分而治之的MapReduce(伪集群模式)

环境准备 首先需要将如下四个必要的文件下载到计算机&#xff08;已经附上了下载地址&#xff0c;点击即可下载&#xff09;。 Vmware Workstation 17.x 【官方的下载地址】 CentOS-7-x86_64-Minimal-2009【阿里云镜像站下载地址】 openjdk-8u41-b04-linux-x64-14_jan_2020【开…

入栏需看——全国硕士研究生入学统一考试管理类专业学位联考

本栏意在收集关于全国硕士研究生入学统一考试管理类专业学位联考&#xff0c;简称管理类联考的知识点&#xff0c;考点&#xff0c;希望大家一起沟通&#xff0c;一起进步&#xff0c;管它贵不贵&#xff0c;考过了再说咯 英语 知识篇 阅读 完型填空 作文 技巧篇 第二章…

rolling的用法实例

在数据分析的过程中&#xff0c;经常用到对计算移动均值&#xff0c;使用rolling可以轻松实现这个功能~ rolling函数是一个用于时间序列分析的函数&#xff1b; 一、参数解析 首先&#xff0c;让我们来了解一下rolling的各个参数吧 DataFrame.rolling(window, min_periodsN…

Echarts—X轴鼠标滑动或者缩放/多列柱状图中某一列数据为0时不占位

这里写目录标题 需求背景图表展示X轴鼠标滑动或者缩放设置多列柱状图中某一列数据为0时不占位图表代码展示 需求背景 用柱状图展示12个月的项目对应的供应商数据&#xff1b;每个月有多个项目不确定&#xff0c;1-50之间&#xff0c;也就是说&#xff0c;12个月&#xff0c;每…

1.数据库的基本操作

SQL句子中语法格式提示&#xff1a; 1.中括号&#xff08;[]&#xff09;中的内容为可选项&#xff1b; 2.[&#xff0c;...]表示&#xff0c;前面的内容可重复&#xff1b; 3.大括号&#xff08;{}&#xff09;和竖线&#xff08;|&#xff09;表示选择项&#xff0c;在选择…

魏可伟受邀参加 2023 开放原子全球开源峰会

6月11日-13日&#xff0c;2023 开放原子全球开源峰会在京举行。作为开源行业年度盛事&#xff0c;本次峰会以“开源赋能&#xff0c;普惠未来”为主题&#xff0c;聚集政、产、学、研等各领域优势&#xff0c;汇聚顶尖大咖&#xff0c;共话开源未来。 KaiwuDB CTO 魏可伟受邀出…

Rancher的安装(k8s)

1、 Rancher概述 rancher官方文档 Rancher 是一个 Kubernetes 管理工具&#xff0c;让你能在任何地方和任何提供商上部署和运行集群。 Rancher 可以创建来自 Kubernetes 托管服务提供商的集群&#xff0c;创建节点并安装 Kubernetes&#xff0c;或者导入在任何地方运行的现有…

【总结笔记】Spring

1 Spring容器加载配置文件进行初始化。 Spring容器加载配置文件进行初始化主要有两种形式&#xff1a; 加载配置文件进行初始化&#xff1a; ClassPathXmlApplicationContext ctx new ClassPathXmlApplicationContext(“ApplicationContext.xml”); 加载配置类进行初始化&…

测试人如何打造简历化思维?三年经验软件测试简历分析

测试人如何打造简历化思维&#xff1f;如题&#xff0c;不是写如何打造简历&#xff0c;而是简历化思维&#xff0c;如何理解简历化思维&#xff1f; 很多人跟我说很累&#xff0c;不想干了&#xff0c;每天忙忙碌碌在给老板打工&#xff0c;年底老板又可以换法拉利了。 玩笑…

【探索 Kubernetes|容器基础进阶篇 系列 4】理解现代云原生时代的引擎

文章目录 系列文章目录&#x1f479; 关于作者一、前言|回顾二、静态和动态视图三、爆火的容器编排工具 Kubernetes 的诞生四、Kubernetes 要解决的问题是什么&#xff1f;五、理解 Kubernetes 全局架构图Master&#xff08;控制节点&#xff09;Node&#xff08;计算节点&…

Go语言小细节

Go语言小细节 结构体 结构体中允许存在匿名字段&#xff0c;即只有类型没有具体的变量名&#xff0c;但是一个结构体内只允许有一个相同类型的结构体中字段大写开头表示可公开访问&#xff0c;小写表示私有&#xff08;仅在当前结构体的包中可访问&#xff09;在编写结构体的…

Linux之进程信号(下)

文章目录 前言信号的相关概念一、信号的保存——位图1.内核中的表示2.信号集——sigset_t3.信号集操作函数 二、信号的捕捉过程1.内核态和用户态用户代码和内核代码如何分辨是用户态还是内核态一个进程如何跑到OS中执行方法 2.信号捕捉的过程 三、核心转储1.数组越界并不一定会…

CTFHub | 远程包含

0x00 前言 CTFHub 专注网络安全、信息安全、白帽子技术的在线学习&#xff0c;实训平台。提供优质的赛事及学习服务&#xff0c;拥有完善的题目环境及配套 writeup &#xff0c;降低 CTF 学习入门门槛&#xff0c;快速帮助选手成长&#xff0c;跟随主流比赛潮流。 0x01 题目描述…