【源码解析】canal核心功能源码解析

news2025/1/11 3:54:58

1. 项目地址

https://github.com/alibaba/canal.git

2. 模块介绍

canal核心模块的功能:

  • deployer模块:独立部署模块,用于canal-server的独立启动,包括本地配置解析、拉取远程配置、启动canal-server。
  • server模块:canal-server的实现逻辑,一个canal-server一般是一个jvm进程。重点关注两种canal-server的实现方式,内嵌型的canalServerEmbed和独立使用的canalServerWithNetty。新版本中新增了直接对接mq的canal-server实现。
  • instance模块:具体实时订阅任务是由一个个instance组成的,每个canal-server中可以同时运行多个instance。instance由parser、sink、store三个重点模块组成。
  • parser模块:数据源接入,模拟slave协议和master进行交互,协议解析。parser模块依赖于dbsync、driver模块。
  • sink模块:将parser抓取到的数据,进行过滤,加工,然后发送到store模块进行存储。核心接口为CanalEventSink。
  • store模块:数据存储模块,类似内存模式到消息队列,本质上是一个RingBuffer。核心接口为CanalEventStore。
  • meta模块:增量订阅&消费信息管理器,核心接口为CanalMetaManager,主要用于记录canal消费到的mysql binlog的位置
  • client模块:项目最早的消费客户端,通过将client模块引入自己的项目中,然后直接消费canal-server获取的数据。
  • client-adapter模块:1.1.x后新出的模块,可以独立部署为canal-server的消费服务端,是一个springboot项目。通过SPI机制,能够加载不同plugins,将消费信息投递到ES\hbase\rdb等下游。
  • admin模块:1.1.x新出的模块,可以独立部署为canal-server的控制台,配置canal-server、instance相关配置,非常好用。

3. deployer源码解析

1. 入口类CanalLauncher

  1. 加载canal.properties的配置内容
  2. 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了。
  3. 如果是admin控制,使用PlainCanalConfigClient获取远程配置,新开一个线程池,每隔五秒用http请求去admin上拉配置,进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server。
  4. 核心是用canalStarter.start()启动。
  5. 使用CountDownLatch保持主线程存活到canalStarter启动完成
    public static void main(String[] args) {
        try {
            logger.info("## set default uncaught exception handler");
            setGlobalUncaughtExceptionHandler();

            logger.info("## load canal configurations");
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }

            final CanalStarter canalStater = new CanalStarter(properties);
            String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
            if (StringUtils.isNotEmpty(managerAddress)) {
                String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
                String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
                boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
                    CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
                String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
                String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
                String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
                if (StringUtils.isEmpty(registerIp)) {
                    registerIp = AddressUtils.getHostIp();
                }
                final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
                    user,
                    passwd,
                    registerIp,
                    Integer.parseInt(adminPort),
                    autoRegister,
                    autoCluster,
                    name);
                PlainCanal canalConfig = configClient.findServer(null);
                if (canalConfig == null) {
                    throw new IllegalArgumentException("managerAddress:" + managerAddress
                                                       + " can't not found config for [" + registerIp + ":" + adminPort
                                                       + "]");
                }
                Properties managerProperties = canalConfig.getProperties();
                // merge local
                managerProperties.putAll(properties);
                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                    "5"));
                executor.scheduleWithFixedDelay(new Runnable() {

                    private PlainCanal lastCanalConfig;

                    public void run() {
                        try {
                            if (lastCanalConfig == null) {
                                lastCanalConfig = configClient.findServer(null);
                            } else {
                                PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
                                if (newCanalConfig != null) {
                                    // 远程配置canal.properties修改重新加载整个应用
                                    canalStater.stop();
                                    Properties managerProperties = newCanalConfig.getProperties();
                                    // merge local
                                    managerProperties.putAll(properties);
                                    canalStater.setProperties(managerProperties);
                                    canalStater.start();

                                    lastCanalConfig = newCanalConfig;
                                }
                            }

                        } catch (Throwable e) {
                            logger.error("scan failed", e);
                        }
                    }

                }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
                canalStater.setProperties(managerProperties);
            } else {
                canalStater.setProperties(properties);
            }

            canalStater.start();
            runningLatch.await();
            executor.shutdownNow();
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:", e);
        }
    }

PlainCanalConfigClient

  • PlainCanalConfigClient#findServer,发现服务器。给canal-admin发送请求

http://${admin.host}:8089/api/v1/config/server_polling?ip=${local.host}&port=11110&md5=&register=0&cluster=null&name=null

2. 启动类CanalStarter

核心对象:

  • CanalController:是canalServer真正的启动控制器
  • canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
  • CanalAdminWithNetty:该类是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等
    /**
     * 启动方法
     *
     * @throws Throwable
     */
    public synchronized void start() throws Throwable {
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
        if (!"tcp".equalsIgnoreCase(serverMode)) {
            ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
            canalMQProducer = loader
                .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
            if (canalMQProducer != null) {
                ClassLoader cl = Thread.currentThread().getContextClassLoader();
                Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
                canalMQProducer.init(properties);
                Thread.currentThread().setContextClassLoader(cl);
            }
        }

        if (canalMQProducer != null) {
            MQProperties mqProperties = canalMQProducer.getMqProperties();
            // disable netty
            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
            if (mqProperties.isFlatMessage()) {
                // 设置为raw避免ByteString->Entry的二次解析
                System.setProperty("canal.instance.memory.rawEntry", "false");
            }
        }

        logger.info("## start the canal server.");
        controller = new CanalController(properties);
        controller.start();
        logger.info("## the canal server is running now ......");
        shutdownThread = new Thread(() -> {
            try {
                logger.info("## stop the canal server");
                controller.stop();
                CanalLauncher.runningLatch.countDown();
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal Server:", e);
            } finally {
                logger.info("## canal server is down.");
            }
        });
        Runtime.getRuntime().addShutdownHook(shutdownThread);

        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
            canalMQStarter.start(destinations);
            controller.setCanalMQStarter(canalMQStarter);
        }

        // start canalAdmin
        String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
        if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
            String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
            String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
            CanalAdminController canalAdmin = new CanalAdminController(this);
            canalAdmin.setUser(user);
            canalAdmin.setPasswd(passwd);

            String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);

            logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
                port,
                user,
                passwd,
                ip);

            CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
            canalAdminWithNetty.setCanalAdmin(canalAdmin);
            canalAdminWithNetty.setPort(Integer.parseInt(port));
            canalAdminWithNetty.setIp(ip);
            canalAdminWithNetty.start();
            this.canalAdmin = canalAdminWithNetty;
        }

        running = true;
    }

CanalAdminWithNetty

CanalAdminWithNetty#start,启动netty,接收admin发送的请求。

    public void start() {
        super.start();

        this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool()));
        /*
         * enable keep-alive mechanism, handle abnormal network connection
         * scenarios on OS level. the threshold parameters are depended on OS.
         * e.g. On Linux: net.ipv4.tcp_keepalive_time = 300
         * net.ipv4.tcp_keepalive_probes = 2 net.ipv4.tcp_keepalive_intvl = 30
         */
        bootstrap.setOption("child.keepAlive", true);
        /*
         * optional parameter.
         */
        bootstrap.setOption("child.tcpNoDelay", true);

        // 构造对应的pipeline
        bootstrap.setPipelineFactory(() -> {
            ChannelPipeline pipelines = Channels.pipeline();
            pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
            // support to maintain child socket channel.
            pipelines.addLast(HandshakeInitializationHandler.class.getName(),
                new HandshakeInitializationHandler(childGroups));
            pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                new ClientAuthenticationHandler(canalAdmin));

            SessionHandler sessionHandler = new SessionHandler(canalAdmin);
            pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
            return pipelines;
        });

        // 启动
        if (StringUtils.isNotEmpty(ip)) {
            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));
        } else {
            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));
        }
    }

SessionHandler#messageReceived,用来处理admin发送过来的请求。

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        logger.info("message receives in session handler...");
        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
        Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
        try {
            String action = null;
            String message = null;
            String destination = null;
            switch (packet.getType()) {
                case SERVER:
                    ServerAdmin serverAdmin = ServerAdmin.parseFrom(packet.getBody());
                    action = serverAdmin.getAction();
                    switch (action) {
                        case "check":
                            message = canalAdmin.check() ? "1" : "0";
                            break;
                        case "start":
                            message = canalAdmin.start() ? "1" : "0";
                            break;
                        case "stop":
                            message = canalAdmin.stop() ? "1" : "0";
                            break;
                        case "restart":
                            message = canalAdmin.restart() ? "1" : "0";
                            break;
                        case "list":
                            message = canalAdmin.getRunningInstances();
                            break;
                        default:
                            byte[] errorBytes = AdminNettyUtils.errorPacket(301,
                                MessageFormatter.format("ServerAdmin action={} is unknown", action).getMessage());
                            AdminNettyUtils.write(ctx.getChannel(), errorBytes);
                            break;
                    }
                    AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
                    break;
                case INSTANCE:
                    InstanceAdmin instanceAdmin = InstanceAdmin.parseFrom(packet.getBody());
                    destination = instanceAdmin.getDestination();
                    action = instanceAdmin.getAction();
                    switch (action) {
                        case "check":
                            message = canalAdmin.checkInstance(destination) ? "1" : "0";
                            break;
                        case "start":
                            message = canalAdmin.startInstance(destination) ? "1" : "0";
                            break;
                        case "stop":
                            message = canalAdmin.stopInstance(destination) ? "1" : "0";
                            break;
                        case "release":
                            message = canalAdmin.releaseInstance(destination) ? "1" : "0";
                            break;
                        case "restart":
                            message = canalAdmin.restartInstance(destination) ? "1" : "0";
                            break;
                        default:
                            byte[] errorBytes = AdminNettyUtils.errorPacket(301,
                                MessageFormatter.format("InstanceAdmin action={} is unknown", action).getMessage());
                            AdminNettyUtils.write(ctx.getChannel(), errorBytes);
                            break;
                    }
                    AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
                    break;
                case LOG:
                    LogAdmin logAdmin = LogAdmin.parseFrom(packet.getBody());
                    action = logAdmin.getAction();
                    destination = logAdmin.getDestination();
                    String type = logAdmin.getType();
                    String file = logAdmin.getFile();
                    int count = logAdmin.getCount();
                    switch (type) {
                        case "server":
                            if ("list".equalsIgnoreCase(action)) {
                                message = canalAdmin.listCanalLog();
                            } else {
                                message = canalAdmin.canalLog(count);
                            }
                            break;
                        case "instance":
                            if ("list".equalsIgnoreCase(action)) {
                                message = canalAdmin.listInstanceLog(destination);
                            } else {
                                message = canalAdmin.instanceLog(destination, file, count);
                            }
                            break;
                        default:
                            byte[] errorBytes = AdminNettyUtils.errorPacket(301,
                                MessageFormatter.format("LogAdmin type={} is unknown", type).getMessage());
                            AdminNettyUtils.write(ctx.getChannel(), errorBytes);
                            break;
                    }
                    AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
                    break;
                default:
                    byte[] errorBytes = AdminNettyUtils.errorPacket(300,
                        MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
                    AdminNettyUtils.write(ctx.getChannel(), errorBytes);
                    break;
            }
        } catch (Throwable exception) {
            byte[] errorBytes = AdminNettyUtils.errorPacket(400,
                MessageFormatter.format("something goes wrong with channel:{}, exception={}",
                    ctx.getChannel(),
                    ExceptionUtils.getStackTrace(exception)).getMessage());
            AdminNettyUtils.write(ctx.getChannel(), errorBytes);
        }
    }

CanalMQStarter

CanalMQStarter#start,核心就是初始化canalServer,CanalServerWithEmbedded.instance()和执行CanalMQRunnable,而该类的核心就是执行CanalMQStarter#worker

CanalMQStarter#worker,就是执行canalServer.getWithoutAck,从canal-server中获取消息,用canalMQProducer将消息发送出去。

    private void worker(String destination, AtomicBoolean destinationRunning) {
        while (!running || !destinationRunning.get()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        logger.info("## start the MQ producer: {}.", destination);
        MDC.put("destination", destination);
        final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
        while (running && destinationRunning.get()) {
            try {
                CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
                if (canalInstance == null) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    continue;
                }
                MQDestination canalDestination = new MQDestination();
                canalDestination.setCanalDestination(destination);
                CanalMQConfig mqConfig = canalInstance.getMqConfig();
                canalDestination.setTopic(mqConfig.getTopic());
                canalDestination.setPartition(mqConfig.getPartition());
                canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                canalDestination.setPartitionHash(mqConfig.getPartitionHash());
                canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());
                canalDestination.setEnableDynamicQueuePartition(mqConfig.getEnableDynamicQueuePartition());

                canalServer.subscribe(clientIdentity);
                logger.info("## the MQ producer: {} is running now ......", destination);

                Integer getTimeout = mqProperties.getFetchTimeout();
                Integer getBatchSize = mqProperties.getBatchSize();
                while (running && destinationRunning.get()) {
                    Message message;
                    if (getTimeout != null && getTimeout > 0) {
                        message = canalServer.getWithoutAck(clientIdentity,
                            getBatchSize,
                            getTimeout.longValue(),
                            TimeUnit.MILLISECONDS);
                    } else {
                        message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                    }

                    final long batchId = message.getId();
                    try {
                        int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                        if (batchId != -1 && size != 0) {
                            canalMQProducer.send(canalDestination, message, new Callback() {

                                @Override
                                public void commit() {
                                    canalServer.ack(clientIdentity, batchId); // 提交确认
                                }

                                @Override
                                public void rollback() {
                                    canalServer.rollback(clientIdentity, batchId);
                                }
                            }); // 发送message到topic
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                        }

                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            }
        }
    }

MigrateMap

MigrateMap内部使用了LoadingCache,该类是guaua的工具类,当获取不到指定的key的时候,调用CacheLoader里面的实现方法。MigrateMap做了一层简单的封装。

public class MigrateMap {

    public static <K, V> ConcurrentMap<K, V> makeComputingMap(CacheBuilder<Object, Object> builder,
                                                              Function<? super K, ? extends V> computingFunction) {
        final Function<? super K, ? extends V> function = computingFunction;
        LoadingCache<K, V> computingCache = builder.build(new CacheLoader<K, V>() {

            @Override
            public V load(K key) throws Exception {
                return function.apply(key);
            }
        });

        return new MigrateConcurrentMap<>(computingCache);
    }

    public static <K, V> ConcurrentMap<K, V> makeComputingMap(Function<? super K, ? extends V> computingFunction) {
        return makeComputingMap(CacheBuilder.newBuilder(), computingFunction);
    }

    final static class MigrateConcurrentMap<K, V> implements ConcurrentMap<K, V> {

        private final LoadingCache<K, V>  computingCache;

        private final ConcurrentMap<K, V> cacheView;

        MigrateConcurrentMap(LoadingCache<K, V> computingCache){
            this.computingCache = computingCache;
            this.cacheView = computingCache.asMap();
        }

        @Override
        public int size() {
            return cacheView.size();
        }
    }
}

3. CanalController

  • 端口11111,是用来tcp通信的;端口11112,是用来提供metrics数据的。端口11110是用来和canal-admin通信的。
  • 如果zk配置了的话,在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
  • 启动embededCanalServer
  • 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance。
  • 如果是admin模式的话,使用ManagerInstanceConfigMonitor进行instance的启动

CanalController#start

    public void start() throws Throwable {
        logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
        // 创建整个canal的工作节点
        final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
        initCid(path);
        if (zkclientx != null) {
            this.zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
        // 优先启动embeded服务
        embededCanalServer.start();
        // 尝试启动一下非lazy状态的通道
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            if (!embededCanalServer.isStart(destination)) {
                // HA机制启动
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }

            if (autoScan) {
                instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
            }
        }

        if (autoScan) {
            instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
            for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
                if (!monitor.isStart()) {
                    monitor.start();
                }
            }
        }

        // 启动网络接口
        if (canalServer != null) {
            canalServer.start();
        }
    }

CanalServerWithEmbedded

CanalServerWithEmbedded#loadCanalMetrics,根据javaSPI获取CanalMetricsProvider,执行PrometheusProvider#getService的方法。

    private void loadCanalMetrics() {
        ServiceLoader<CanalMetricsProvider> providers = ServiceLoader.load(CanalMetricsProvider.class);
        List<CanalMetricsProvider> list = new ArrayList<>();
        for (CanalMetricsProvider provider : providers) {
            list.add(provider);
        }

        if (list.isEmpty()) {
            return;
        }

        // only allow ONE provider
        if (list.size() > 1) {
            logger.warn("Found more than one CanalMetricsProvider, use the first one.");
            // 报告冲突
            for (CanalMetricsProvider p : list) {
                logger.warn("Found CanalMetricsProvider: {}.", p.getClass().getName());
            }
        }

        CanalMetricsProvider provider = list.get(0);
        this.metrics = provider.getService();
    }

ServerRunningMonitors

ServerRunningMonitors设置监听器。

        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
            ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
            runningMonitor.setDestination(destination);
            runningMonitor.setListener(new ServerRunningListener() {

                public void processActiveEnter() {
                    try {
                        MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                        embededCanalServer.start(destination);
                        if (canalMQStarter != null) {
                            canalMQStarter.startDestination(destination);
                        }
                    } finally {
                        MDC.remove(CanalConstants.MDC_DESTINATION);
                    }
                }
            }

ServerRunningMonitor#start,启动服务。

    public synchronized void start() {
        super.start();
        try {
            processStart();
            if (zkClient != null) {
                // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
                zkClient.subscribeDataChanges(path, dataListener);

                initRunning();
            } else {
                processActiveEnter();// 没有zk,直接启动
            }
        } catch (Exception e) {
            logger.error("start failed", e);
            // 没有正常启动,重置一下状态,避免干扰下一次start
            stop();
        }

    }

    private void processActiveEnter() {
        if (listener != null) {
            listener.processActiveEnter();
        }
    }

CanalServerWithEmbedded#start(),启动CanalInstance

    public void start(final String destination) {
        final CanalInstance canalInstance = canalInstances.get(destination);
        if (!canalInstance.isStart()) {
            try {
                MDC.put("destination", destination);
                if (metrics.isRunning()) {
                    metrics.register(canalInstance);
                }
                canalInstance.start();
                logger.info("start CanalInstances[{}] successfully", destination);
            } finally {
                MDC.remove("destination");
            }
        }
    }

ManagerInstanceConfigMonitor

ManagerInstanceConfigMonitor#start,采用定时器使用PlainCanalConfigClient来获取canal-admin的实例,通过defaultAction进行本地的启动。

    public void start() {
        super.start();
        executor.scheduleWithFixedDelay(() -> {
            try {
                scan();
                if (isFirst) {
                    isFirst = false;
                }
            } catch (Throwable e) {
                logger.error("scan failed", e);
            }
        }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
    }

    private void scan() {
        String instances = configClient.findInstances(null);
        if (instances == null) {
            return;
        }

        final List<String> is = Lists.newArrayList(StringUtils.split(instances, ','));
        List<String> start = new ArrayList<>();
        List<String> stop = new ArrayList<>();
        List<String> restart = new ArrayList<>();
        for (String instance : is) {
            if (!configs.containsKey(instance)) {
                PlainCanal newPlainCanal = configClient.findInstance(instance, null);
                if (newPlainCanal != null) {
                    configs.put(instance, newPlainCanal);
                    start.add(instance);
                }
            } else {
                PlainCanal plainCanal = configs.get(instance);
                PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());
                if (newPlainCanal != null) {
                    // 配置有变化
                    restart.add(instance);
                    configs.put(instance, newPlainCanal);
                }
            }
        }

        configs.forEach((instance, plainCanal) -> {
            if (!is.contains(instance)) {
                stop.add(instance);
            }
        });

        stop.forEach(instance -> {
            notifyStop(instance);
        });

        restart.forEach(instance -> {
            notifyReload(instance);
        });

        start.forEach(instance -> {
            notifyStart(instance);
        });

    }

    private void notifyStart(String destination) {
        try {
            defaultAction.start(destination);
            actions.put(destination, defaultAction);
            // 启动成功后记录配置文件信息
        } catch (Throwable e) {
            logger.error(String.format("scan add found[%s] but start failed", destination), e);
        }
    }

InstanceAction#start主要是调用ServerRunningMonitor来启动服务。

            defaultAction = new InstanceAction() {

                public void start(String destination) {
                    InstanceConfig config = instanceConfigs.get(destination);
                    if (config == null) {
                        // 重新读取一下instance config
                        config = parseInstanceConfig(properties, destination);
                        instanceConfigs.put(destination, config);
                    }

                    if (!embededCanalServer.isStart(destination)) {
                        // HA机制启动
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                        if (!config.getLazy() && !runningMonitor.isStart()) {
                            runningMonitor.start();
                        }
                    }

                    logger.info("auto notify start {} successful.", destination);
                }
            }

CanalInstanceWithSpring

CanalInstanceWithSpring#start

public void start() {
    logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
    super.start();
}

AbstractCanalInstance#start,instance实例启动会开启metaManageralarmHandlereventStoreeventSinkeventParser

    @Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();
        }

        if (!eventStore.isStart()) {
            eventStore.start();
        }

        if (!eventSink.isStart()) {
            eventSink.start();
        }

        if (!eventParser.isStart()) {
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

4. admin模块源码解析

admin的管理台需要手工录入NodeServer的信息,配置已经搭建好的canal-deployer的节点的ip和端口号,保持与canal-deployer的信息流通。canal-deployer的信息就不断的拉取canal-admin上的配置信息,来达到配置的可视化。

该项目admin-web是一个springBoot项目。启动入口在CanalAdminApplication

1. 查询NodeServer

NodeServerController#nodeServers

    @GetMapping(value = "/nodeServers")
    public BaseModel<Pager<NodeServer>> nodeServers(NodeServer nodeServer, Pager<NodeServer> pager,
                                                    @PathVariable String env) {
        return BaseModel.getInstance(nodeServerService.findList(nodeServer, pager));
    }

NodeServerServiceImpl#findList,获取server列表。查询数据库表canal_node_server。调用SimpleAdminConnector#doServerAdmin来判断是否启动成功。

    public Pager<NodeServer> findList(NodeServer nodeServer, Pager<NodeServer> pager) {

        Query<NodeServer> query = getBaseQuery(nodeServer);
        Query<NodeServer> queryCnt = query.copy();

        int count = queryCnt.findCount();
        pager.setCount((long) count);

        List<NodeServer> nodeServers = query.order()
            .asc("id")
            .setFirstRow(pager.getOffset().intValue())
            .setMaxRows(pager.getSize())
            .findList();
        pager.setItems(nodeServers);

        if (nodeServers.isEmpty()) {
            return pager;
        }

        List<Future<Boolean>> futures = new ArrayList<>(nodeServers.size());
        // get all nodes status
        for (NodeServer ns : nodeServers) {
            futures.add(Threads.executorService.submit(() -> {
                boolean status = SimpleAdminConnectors.execute(ns.getIp(), ns.getAdminPort(), AdminConnector::check);
                ns.setStatus(status ? "1" : "0");
                return !status;
            }));
        }
        for (Future<Boolean> f : futures) {
            try {
                f.get(3, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException e) {
                // ignore
            } catch (TimeoutException e) {
                break;
            }
        }

        return pager;
    }

2. 保存NodeServer

NodeServerController#save,保存NodeServer

    @PostMapping(value = "/nodeServer")
    public BaseModel<String> save(@RequestBody NodeServer nodeServer, @PathVariable String env) {
        nodeServerService.save(nodeServer);
        return BaseModel.getInstance("success");
    }

NodeServerServiceImpl#save,判断是否已经存在相同的节点。

    public void save(NodeServer nodeServer) {
        int cnt = NodeServer.find.query()
            .where()
            .eq("ip", nodeServer.getIp())
            .eq("adminPort", nodeServer.getAdminPort())
            .findCount();
        if (cnt > 0) {
            throw new ServiceException("节点信息已存在");
        }

        nodeServer.save();

        if (nodeServer.getClusterId() == null) { // 单机模式
            CanalConfig canalConfig = new CanalConfig();
            canalConfig.setServerId(nodeServer.getId());
            String configTmp = TemplateConfigLoader.loadCanalConfig();
            canalConfig.setContent(configTmp);
            try {
                String contentMd5 = SecurityUtil.md5String(canalConfig.getContent());
                canalConfig.setContentMd5(contentMd5);
            } catch (NoSuchAlgorithmException e) {
            }
            canalConfig.save();
        }
    }

在这里插入图片描述

参考博文:

  • https://blog.csdn.net/u014730658/article/details/107144812

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/477958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

山东专升本计算机第六章-数据库技术

数据库技术 SQL数据库与NOSQL数据库的区别 数据库管理系统 考点 6 数据库管理系统的组成和功能 组成 • 模式翻译 • 应用程序的翻译 • 交互式查询 • 数据的组织和存取 • 事务运行管理 • 数据库的维护 功能 • 数据定义功能 • 数据存取功能 • 数据库运行管理…

MySQL备份和恢复

文章目录 一、库的备份和恢复1.库的备份2.库的恢复 二、表的备份和恢复1.表的备份2.表的恢复 备份数据&#xff0c;其实就是生成一个 sql 文件&#xff0c;把创建数据库、创建表、插入数据等各种 SQL 语句都装载到这个文件中。恢复数据&#xff0c;其实就是按顺序执行 sql 文件…

操作系统进程概述、通信

进程 进程就是程序的一次执行过程&#xff0c;同一个程序多次执行对应多个进程&#xff1b; 一、进程的组成 1、PCB &#xff08;1&#xff09;进程描述信息&#xff1a;主要是PID等关键信息&#xff1b; &#xff08;2&#xff09;进程控制和管理信息&#xff1b; &#xf…

( 数组和矩阵) 566. 重塑矩阵 ——【Leetcode每日一题】

❓566. 重塑矩阵 难度&#xff1a;简单 在 MATLAB 中&#xff0c;有一个非常有用的函数 reshape &#xff0c;它可以将一个 m x n 矩阵重塑为另一个大小不同&#xff08;r x c&#xff09;的新矩阵&#xff0c;但保留其原始数据。 给你一个由二维数组 mat 表示的 m x n 矩阵…

unity-VRTK-simulator开发学习日记3(射线样式|忽略层|有无效名单)

目录 射线样式 组成 可用状态 材质替换 射线激活设置为常态 忽略层级&#xff08;射线等&#xff09; 自定义忽略层级 &#xff08;射线等&#xff09; 有效名单和无效名单 有效名单 无效名单 创建一个模拟手柄的按钮&#xff08;键盘键入按钮&#xff09; 输入系统…

最新Wordpress网站因重装宝塔而导致数据库崩溃无法访问的终极解决办法

在当今数字化时代&#xff0c;拥有自己的网站已成为越来越多人展示个人或企业信息的重要方式。虽然建立并维护自己网站看起来是一个简单的过程&#xff0c;但如果出现问题&#xff0c;比如数据丢失、网站无法正常运行等情况时&#xff0c;往往会令用户感到十分懊恼和无助。在这…

sql注入(二)盲注,二次注入,宽字节注入

目录 目录 一、布尔盲注 1.判断库名的长度 2.判断数据库名 2.1判断数据库名首字符 2.2 判断数据库名的其余字符 二、时间盲注&#xff1a; 1.判断库名的长度 2.判断库名&#xff1a; 3.判断表名payload&#xff1a; 4.爆出列名 5.爆数据 三、二次注入 1.原理&#…

【某软件网络协议分析】

由于网站无法上传附件&#xff0c;本帖子完整内容请点击此处 首先&#xff0c;从coco.apk提取dex文件&#xff0c;利用dex2jar将dex转化为jar&#xff0c;拖到jd-gui中&#xff0c;发现有如下几个可疑点&#xff1a; com.azus.android.tcplogin.CryptUtil.rsaEncrypt com.azus…

MySQL示例数据库(MySQL Sample Databases) 之 Employees 数据库

文章目录 MySQL示例数据库(MySQL Sample Databases) 之 Employees 数据库官方示例数据介绍Employees 数据库Employees 数据库安装Employees 数据库的结构参考 MySQL示例数据库(MySQL Sample Databases) 之 Employees 数据库 官方示例数据介绍 MySQL 官方提供了多个示例数据库…

二叉树及其遍历

文章目录 二叉树树的定义二叉树的定义遍历先序遍历中序遍历后序遍历层次遍历定义队列层次创建二叉树层次遍历 二叉树 树是一种非线性的数据结构&#xff0c;由若干个节点组成&#xff0c;节点之间存在一种父子关系&#xff0c;具有层次结构。二叉树是一种特殊的树结构&#xff…

火遍全网的ChatGPT究竟是什么?

ChatGPT是什么 ChatGPT是一个由OpenAI开发的大型语言模型&#xff0c;基于GPT-3.5架构。它被训练用于自然语言处理和生成任务&#xff0c;可以回答各种问题&#xff0c;包括一般知识、文化、科学、技术、商业、娱乐等方面的问题。ChatGPT可以进行对话&#xff0c;回答用户的问…

OJ刷题 第十四篇(递归较多)

23204 - 进制转换 时间限制 : 1 秒 内存限制 : 128 MB 将一个10进制数x(1 < x < 100,000,000)转换成m进制数(2< m < 16) 。分别用 ABCDEF表示10以上的数字。 输入 x m (1 < x < 100,000,000, 2< m < 16) 输出 m进制数 样例 输入 31 16 输出 1F 答…

功能齐全的 ESP32 智能手表,具有多个表盘、心率传感器硬件设计

相关设计资料下载ESP32 智能手表带心率、指南针设计资料(包含Arduino源码+原理图+Gerber+3D文件).zip 介绍 我们调查了智能手表项目的不同方面,并学会了集成和测试每个单独的部分。在本文中,我们将使用所学知识,结合使用硬件和软件组件,从头开始创建我们自己的智能手表。在…

存储资源调优技术——SmartDedupe智能数据重删、SmartCompression智能数据压缩技术

目录 SmartDedupe智能数据重删技术 SmartCompression智能数据压缩技术 SmartDedupe智能数据重删技术 基本概念 智能数据重删技术 是一种数据缩减技术&#xff0c;通过删除存储系统中的冗余数据块 减少数据占用的物理存储容量&#xff0c;节省存储空间&#xff08;会降低性能&a…

Java 基础入门篇(三)——— 数组的定义与内存原理

文章目录 一、数组的定义1.1 静态初始化数组1.2 动态初始化数组1.3 数组的访问 二、数组的遍历三、数组的内存图 ★3.1 Java 的内存分配3.2 数组的内存图3.3 两个数组变量指向同一个数组对象 四、数组使用的常见问题补充&#xff1a;随机数 Random 类 一、数组的定义 数组就是…

线程池~~

文章目录 线程池线程池实现API、参数说明线程池处理Runnable任务线程池处理Callable任务Executors工具类实现线程池定时器Timer定时器ScheduledExecutorService定时器 并发和并行线程的生命周期 线程池 线程池实现API、参数说明 线程池处理Runnable任务 线程池处理Callable任务…

Win11的两个实用技巧系列之修改c盘大小方法、功能快捷键大全

Win11 c盘无法更改大小什么原因?Win11修改c盘大小方法 有不少朋友反应Win11 c盘无法更改大小是怎么回事&#xff1f;本文就为大家带来了详细的更改方法&#xff0c;需要的朋友一起看看吧 Win11 c卷无法更改大小什么原因&#xff1f;有用户电脑的系统盘空间太小了&#xff0c;…

CTF权威指南 笔记 -第二章二进制文件-汇编原理

C语言的生命是从 源文件开始 的 每条C语言都必须要给翻译成 一系列的低级语言 最后 按照可执行文件格式打包 并且作为二进制文件保存起来 编译原理 我们需要使用编译器 是通过某种语言 等价输出另一个语言 可以分为前端和后端 前端 和机器无关 把源程序分解为组成要素 …

【Android入门到项目实战-- 7.4】—— 如何播放音频和视频

目录 一、播放音频 MediaPlayer的工作流程 具体代码实现 二、播放视频 具体代码实现 学完本篇文章可以收获如何播放音频和视频。 一、播放音频 播放音频需要使用MediaPlayer类实现&#xff0c;它对各种格式的音频文件提供了全面的控制方法&#xff0c;下面是MediaPlayer类…

Android 9.0 原生SystemUI下拉通知栏UI背景设置为圆角背景的定制(二)

1.前言 在9.0的系统rom定制化开发中,在原生系统SystemUI下拉状态栏的通知栏的背景是默认白色四角的背景,由于在产品设计中,需要把四角背景默认改成圆角背景,所以就需要分析系统原生下拉通知栏的每条通知的默认背景,然后通过熟悉systemui的通知栏流程,设置默认下拉状态栏…