canal server初始化源码分析

news2024/11/6 7:21:01
CanalLauncher类是canal server端启动的入口类,跟随代码进行深入。

在开始之前,我们可以先了解下,

canal 配置方式
  1. ManagerCanalInstanceGenerator: 基于manager管理的配置方式,实时感知配置并进行server重启
  2. SpringCanalInstanceGenerator:基于本地spring xml的配置方式,对于多instance的时候,不便于扩展,一个instance一个xml配置
canal 配置文件
  • canal.properties  (系统根配置文件)
  • instance.properties  (instance级别的配置文件,每个instance一份)
入口代码
   public static void main(String[] args) {
        try {
            logger.info("## set default uncaught exception handler");
            // 设置全局的异常捕获
            setGlobalUncaughtExceptionHandler();

            // 支持rocketmq client 配置日志路径
            System.setProperty("rocketmq.client.logUseSlf4j","true");

            // 加载canal 配置
            logger.info("## load canal configurations");
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
             // 初始化启动类
            final CanalStarter canalStater = new CanalStarter(properties);
            // canal.admin.manager 配置地址
            String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
            if (StringUtils.isNotEmpty(managerAddress)) {
                // 获取admin 用户名和密码
                String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
                String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
                if (StringUtils.isEmpty(passwd)) {
                    throw new IllegalArgumentException(
                        "canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");
                }
                // 设置默认端口号
                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
                // 自否自动注册
                boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
                    CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
                //集群地址
                String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
                //注册名称
                String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
                if (StringUtils.isEmpty(name)) {
                    // 以本地机器为默认名字
                    name = AddressUtils.getHostName();
                }
                //注册到zk 或者admin中server IP 不配置就是本地IP
                String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
                if (StringUtils.isEmpty(registerIp)) {
                    registerIp = AddressUtils.getHostIp();
                }
                // 初始化配置客户端
                final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
                    user,
                    passwd,
                    registerIp,
                    Integer.parseInt(adminPort),
                    autoRegister,
                    autoCluster,
                    name);
                // 通过http方式加载远程配置 通过admin配置的信息
                PlainCanal canalConfig = configClient.findServer(null);
                if (canalConfig == null) {
                    throw new IllegalArgumentException("managerAddress:" + managerAddress
                                                       + " can't not found config for [" + registerIp + ":" + adminPort
                                                       + "]");
                }
                Properties managerProperties = canalConfig.getProperties();
                // merge local  本地配置优先级更高
                managerProperties.putAll(properties);
                // auto.scan.interval  instance自动扫描的间隔时间,单位秒
                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                    "5"));
                executor.scheduleWithFixedDelay(new Runnable() {

                    private PlainCanal lastCanalConfig;
                    // 定时异步线程去加载远程配置
                    public void run() {
                        try {
                            if (lastCanalConfig == null) {
                                lastCanalConfig = configClient.findServer(null);
                            } else {
                                //通过md5的方式验证重新验证配置是否有变更,有变更返回最新数据,并重新加载
                                PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
                                if (newCanalConfig != null) {
                                    // 远程配置canal.properties修改重新加载整个应用
                                    canalStater.stop();
                                    Properties managerProperties = newCanalConfig.getProperties();
                                    // merge local
                                    managerProperties.putAll(properties);
                                    canalStater.setProperties(managerProperties);
                                    canalStater.start();

                                    lastCanalConfig = newCanalConfig;
                                }
                            }

                        } catch (Throwable e) {
                            logger.error("scan failed", e);
                        }
                    }

                }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
                canalStater.setProperties(managerProperties);
            } else {
                canalStater.setProperties(properties);
            }

            canalStater.start();
            runningLatch.await();
            executor.shutdownNow();
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:", e);
        }
    }

上面代码中,对于配置的加载有两部分,当配置了canal.admin.manager 后台管理地址,会先去验证配置的用户名和密码,去获取相应的server配置(通过http的方式进行获取)

    public PlainCanal findServer(String md5) {
        if (StringUtils.isEmpty(md5)) {
            md5 = "";
        }
        String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5
                     + "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);
        return queryConfig(url);
    }

当通过admin的方式进行加载配置,会每隔默认5s通过http的方式获取admin端的配置,会对整体配置进行md5值进行比较,有变更会对server进行重启。

跟随CanalStarter.start() 方法进入到启动方法,在CanalLauncher中已加载到server相关的配置。

   1.  根据配置canal.serverMode获取服务端模式,有tcp、相关的mq( kafka, rocketMQ, rabbitMQ, pulsarMQ)方式,除了tcp模式,需要初始化mq相关配置

        // 服务端模式 可通过配置进行变更 canal.serverMode
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
        if (!"tcp".equalsIgnoreCase(serverMode)) {
            ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
            // /plugin 和 /canal/plugin"
            canalMQProducer = loader
                .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
            if (canalMQProducer != null) {
                canalMQProducer =  new ProxyCanalMQProducer(canalMQProducer);
                // 初始化mq信息 由canal.serverMode 决定使用哪种mq
                canalMQProducer.init(properties);
            }
        }

        if (canalMQProducer != null) {
            MQProperties mqProperties = canalMQProducer.getMqProperties();
            // disable netty
            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
            if (mqProperties.isFlatMessage()) {
                // 设置为raw避免ByteString->Entry的二次解析
                System.setProperty("canal.instance.memory.rawEntry", "false");
            }
        }

2. 初始化canal调度控制器CanalController,并进行start,CanalController主要是初始化各个instance配置

3.  非tcp的方式,进行启动mq。

        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            // 当前server上部署的instance列表
            String destinations = CanalController.getDestinations(properties);
            // 启动mq
            canalMQStarter.start(destinations);
            controller.setCanalMQStarter(canalMQStarter);
        }

4. 初始化CanalAdminController:提供canal admin的管理操作

        // start canalAdmin
        String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
        if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
            String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
            String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
            CanalAdminController canalAdmin = new CanalAdminController(this);
            canalAdmin.setUser(user);
            canalAdmin.setPasswd(passwd);
            String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);

            logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
                port,
                user,
                passwd,
                ip);

            CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
            canalAdminWithNetty.setCanalAdmin(canalAdmin);
            canalAdminWithNetty.setPort(Integer.parseInt(port));
            canalAdminWithNetty.setIp(ip);
            canalAdminWithNetty.start();
            this.canalAdmin = canalAdminWithNetty;
        }

5. jvm停止时,shutdown监听admin配置线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1395446.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java-初识正则表达式 以及 练习

目录 什么是正则表达式&#xff1f; 1. 正则表达式---字符类&#xff08;一个大括号匹配一个字符&#xff09;&#xff1a; 2. 正则表达式---预字符类&#xff08;也是匹配一个字符&#xff09;&#xff1a; 正则表达式---数量词 &#xff08;可以匹配多个字符&#xff09;…

android studio 连接 夜神模拟器方法

android studio 连接 夜神模拟器方法 1、打开cmd 2、输入夜神模拟器的安装地址&#xff0c;至bin目录下 3、输入连接指令&#xff1a;nox_adb.exe connect 127.0.0.1:62001 4、连接成功会提示successfully 5、打开Android Studio&#xff0c;会就会出现连接的模拟器。

C#,入门教程(07)——软件项目的源文件与目录结构

上一篇&#xff1a; C#&#xff0c;入门教程(06)——解决方案资源管理器&#xff0c;代码文件与文件夹的管理工具https://blog.csdn.net/beijinghorn/article/details/124895033 创建新的 C# 项目后&#xff0c; Visual Studio 会自动创建一系列的目录与文件。 程序员后面的工…

Opencv小项目——手势数字刷TIKTOK

​ 写在前面&#xff1a; 很久没更新了&#xff0c;之前的实习的记录也算是烂尾了&#xff0c;但是好在自己的实习记录还是有的&#xff0c;最近也忙碌了很多&#xff0c;终于放假了&#xff0c;今天下午正好没事&#xff0c;闲来无事就随便做个小玩意吧。 思来想去&#xff…

python算法与数据结构(搜索算法和拓扑排序算法)---深度优先搜索

课程目标 了解树/图的深度遍历&#xff0c;宽度遍历基本原理&#xff1b;会使用python语言编写深度遍历&#xff0c;广度遍历代码&#xff1b;掌握拓扑排序算法 搜索算法的意义和作用 搜索引擎 提到搜索两个子&#xff0c;大家都应该会想到搜索引擎&#xff0c;搜索引擎的基…

android 和 opencv 开发环境搭建

本文详细说明给android项目添加opencv库的详细步骤&#xff0c;并通过实现图片灰度化来查看配置是否成功。 下载OPENCV ANDROID SDK 到官网下载 打开 https://opencv.org/releases/ 选择android&#xff0c;下载完成后解压出下面的文件&#xff1a; 安装android sdk 和 ndk …

Tide Quencher 7.2 CPG 500A ,TQ7.2 CPG 500A,可以提高荧光标记物的淬灭效果

您好&#xff0c;欢迎来到新研之家 文章关键词&#xff1a;荧光淬灭剂Tide Quencher 7.2 CPG 500A&#xff0c;Tide Quencher 7.2 CPG 500A &#xff0c;TQ7.2 CPG 500A 一、基本信息 产品简介&#xff1a;The fluorescence quencher Tide Quencher 7.2 CPG 500A can quench…

Android Dialog setCanceledOnTouchOutside失效,点击dialog外面不消失

前言&#xff1a;有一个需求需要点击dialog外面要消失&#xff0c;本来以为很简单结果设置了一直未生效 setCanceledOnTouchOutside(true); 问了半天chat-gpt4结果给的答案都不明显 查看代码发现设置了style&#xff0c;于是尝试去除这个style&#xff0c;结果点击setCancele…

用Go plan9汇编实现斐波那契数列计算

斐波那契数列是一个满足递推关系的数列&#xff0c;如&#xff1a;1 1 2 3 5 8 ... 其前两项为1&#xff0c;第3项开始&#xff0c;每一项都是其前两项之和。 用Go实现一个简单的斐波那契计算逻辑 func fib(n int) int {if n 1 || n 2 {return 1}return fib(n-1) fib(n-2) …

【leetcode】消失的数字

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家刷题&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 1.暴力求解法2.采用异或的方法&#xff08;同单身狗问题&#xff09;3.先求和再减去数组元素 点击查看…

【机器学习300问】12、为什么要进行特征归一化?

当线性回归模型的特征量变多之后&#xff0c;会出现不同的特征量&#xff0c;然而对于那些同是数值型的特征量为什么要做归一化处理呢&#xff1f; 一、为了消除数据特征之间的量纲影响 使得不同指标之间具有可比性。例如&#xff0c;分析一个人的身高和体重对健康的影响&…

【物以类聚】给el-image预览多张图片增加提示文字,让每张图片有所分类

【物以类聚】给el-image预览多张图片增加提示文字&#xff0c;让每张图片有所分类 一、需求二、el-image三、实施步骤3.1 导包3.2 改造3.3 引入 三、效果 一、需求 点击地图上的一张图片&#xff0c;弹出所有相关的图片资源&#xff0c;图片资源上显示每个图片的所属类型。 二…

22k+star一款自托管的开源的的好用的碎片化笔记软件 Memos超级详细部署教程

目录 1.拉取镜像 2.启动 3.体验 4.源码地址 1.拉取镜像 docker pull neosmemo/memos:stable 2.启动 创建目录 mkdir -p /opt/memos/ 启动 docker run -d --name memos -p 10006:5230 -v /opt/memos/:/var/opt/memos neosmemo/memos:stable 3.体验 浏览器输入下面地址…

2023年全球软件架构师峰会(ArchSummit深圳站):核心内容与学习收获(附大会核心PPT下载)

本次峰会是一次重要的技术盛会&#xff0c;旨在为全球软件架构师提供一个交流和学习的平台。本次峰会聚焦于软件架构的最新趋势、最佳实践和技术创新&#xff0c;吸引了来自世界各地的软件架构师、技术专家和企业领袖。 在峰会中&#xff0c;与会者可以了解到数字化、AIGC、To…

彩超框架EchoSight开发日志记录

EchoSight开发记录 蒋志强 我会不定期的更新 开发进展。最近更新进展于2024年1月15日 1.背景 由于某些不可抗逆的原因&#xff0c;离开了以前的彩超大厂&#xff0c;竞业在家&#xff0c;难得有空闲的时间。我计划利用这段时间 自己独立 从零开始 搭建一套 彩超系统的软件工…

GLM-4多模态重磅更新!摸着OpenAI过河!

智谱CEO张鹏说&#xff1a;OpenAI摸着石头过河&#xff0c;我们摸着OpenAI过河。 摸来摸去摸了一年&#xff0c;以每3-4个月升级一次基座模型的速度&#xff0c;智谱摸着OpenAI过河的最新成绩到底怎么样&#xff1f;真如所说吗&#xff1f; 听到GLM-4发布的当天&#xff0c;我就…

FPGA物理引脚,原理(Pacakge and pinout)-认知3

画FPGA芯片引脚封装图&#xff08;原理&#xff09;&#xff0c;第一是参考开发板(根据一下描述了解总览&#xff09;&#xff0c;第二是研究Datasheet. ASCII Pinout File Zynq-7000 All Programmable SoC Packaging and Pinout(UG585) 1. Pacakge overview 1.1&#xff0…

小封装高稳定性振荡器 Sg2520egn / sg2520vgn, sg2520ehn / sg2520vhn

描述 随着物联网和ADAS等5G应用的实施&#xff0c;数据流量不断增长&#xff0c;网络基础设施变得比以往任何时候都更加重要。IT供应商一直在快速建设数据中心&#xff0c;并且对安装在数据中心内部/内部的光模块有很大的需求。此应用需要具有“小”&#xff0c;“低抖动”和“…

npm run dev 启动vue的时候指定端口

使用的是 Vue CLI 来创建和管理 Vue 项目&#xff0c; 可以通过设置 --port 参数来指定启动的端口号。以下是具体的步骤&#xff1a; 打开命令行终端 进入您的 Vue 项目目录 运行以下命令&#xff0c;通过 --port 参数指定端口号&#xff08;例如&#xff0c;这里设置端口号…

PBR材质纹理下载

03:10 按照视频里的顺序 我们从第6个网站开始倒数 点击本行文字或下方链接 进入查看 6大网站地址 网址查看链接&#xff1a; http://www.uzing.net/community_show-1962-48-48-35.html 06 Tectures Wood Fence 001 | 3D TEXTURES 简介&#xff1a;最大的纹理网站之一&#x…