1. 项目地址
https://github.com/alibaba/canal.git
2. 模块介绍
canal核心模块的功能:
- deployer模块:独立部署模块,用于canal-server的独立启动,包括本地配置解析、拉取远程配置、启动canal-server。
- server模块:canal-server的实现逻辑,一个canal-server一般是一个jvm进程。重点关注两种canal-server的实现方式,内嵌型的canalServerEmbed和独立使用的canalServerWithNetty。新版本中新增了直接对接mq的canal-server实现。
- instance模块:具体实时订阅任务是由一个个instance组成的,每个canal-server中可以同时运行多个instance。instance由parser、sink、store三个重点模块组成。
- parser模块:数据源接入,模拟slave协议和master进行交互,协议解析。parser模块依赖于dbsync、driver模块。
- sink模块:将parser抓取到的数据,进行过滤,加工,然后发送到store模块进行存储。核心接口为CanalEventSink。
- store模块:数据存储模块,类似内存模式到消息队列,本质上是一个RingBuffer。核心接口为CanalEventStore。
- meta模块:增量订阅&消费信息管理器,核心接口为CanalMetaManager,主要用于记录canal消费到的mysql binlog的位置
- client模块:项目最早的消费客户端,通过将client模块引入自己的项目中,然后直接消费canal-server获取的数据。
- client-adapter模块:1.1.x后新出的模块,可以独立部署为canal-server的消费服务端,是一个springboot项目。通过SPI机制,能够加载不同plugins,将消费信息投递到ES\hbase\rdb等下游。
- admin模块:1.1.x新出的模块,可以独立部署为canal-server的控制台,配置canal-server、instance相关配置,非常好用。
3. deployer源码解析
1. 入口类CanalLauncher
- 加载canal.properties的配置内容
- 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了。
- 如果是admin控制,使用PlainCanalConfigClient获取远程配置,新开一个线程池,每隔五秒用http请求去admin上拉配置,进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server。
- 核心是用
canalStarter.start()
启动。 - 使用CountDownLatch保持主线程存活到
canalStarter
启动完成
public static void main(String[] args) {
try {
logger.info("## set default uncaught exception handler");
setGlobalUncaughtExceptionHandler();
logger.info("## load canal configurations");
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
final CanalStarter canalStater = new CanalStarter(properties);
String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
if (StringUtils.isNotEmpty(managerAddress)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
if (StringUtils.isEmpty(registerIp)) {
registerIp = AddressUtils.getHostIp();
}
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort),
autoRegister,
autoCluster,
name);
PlainCanal canalConfig = configClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
+ " can't not found config for [" + registerIp + ":" + adminPort
+ "]");
}
Properties managerProperties = canalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
executor.scheduleWithFixedDelay(new Runnable() {
private PlainCanal lastCanalConfig;
public void run() {
try {
if (lastCanalConfig == null) {
lastCanalConfig = configClient.findServer(null);
} else {
PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
if (newCanalConfig != null) {
// 远程配置canal.properties修改重新加载整个应用
canalStater.stop();
Properties managerProperties = newCanalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
canalStater.setProperties(managerProperties);
canalStater.start();
lastCanalConfig = newCanalConfig;
}
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
canalStater.setProperties(managerProperties);
} else {
canalStater.setProperties(properties);
}
canalStater.start();
runningLatch.await();
executor.shutdownNow();
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}
}
PlainCanalConfigClient
PlainCanalConfigClient#findServer
,发现服务器。给canal-admin发送请求
http://${admin.host}:8089/api/v1/config/server_polling?ip=${local.host}&port=11110&md5=®ister=0&cluster=null&name=null
2. 启动类CanalStarter
核心对象:
- CanalController:是canalServer真正的启动控制器
- canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
- CanalAdminWithNetty:该类是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等
/**
* 启动方法
*
* @throws Throwable
*/
public synchronized void start() throws Throwable {
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (!"tcp".equalsIgnoreCase(serverMode)) {
ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
canalMQProducer = loader
.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
if (canalMQProducer != null) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
canalMQProducer.init(properties);
Thread.currentThread().setContextClassLoader(cl);
}
}
if (canalMQProducer != null) {
MQProperties mqProperties = canalMQProducer.getMqProperties();
// disable netty
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
if (mqProperties.isFlatMessage()) {
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
}
logger.info("## start the canal server.");
controller = new CanalController(properties);
controller.start();
logger.info("## the canal server is running now ......");
shutdownThread = new Thread(() -> {
try {
logger.info("## stop the canal server");
controller.stop();
CanalLauncher.runningLatch.countDown();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:", e);
} finally {
logger.info("## canal server is down.");
}
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(destinations);
controller.setCanalMQStarter(canalMQStarter);
}
// start canalAdmin
String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
port,
user,
passwd,
ip);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
canalAdminWithNetty.setPort(Integer.parseInt(port));
canalAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
running = true;
}
CanalAdminWithNetty
CanalAdminWithNetty#start
,启动netty,接收admin发送的请求。
public void start() {
super.start();
this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
/*
* enable keep-alive mechanism, handle abnormal network connection
* scenarios on OS level. the threshold parameters are depended on OS.
* e.g. On Linux: net.ipv4.tcp_keepalive_time = 300
* net.ipv4.tcp_keepalive_probes = 2 net.ipv4.tcp_keepalive_intvl = 30
*/
bootstrap.setOption("child.keepAlive", true);
/*
* optional parameter.
*/
bootstrap.setOption("child.tcpNoDelay", true);
// 构造对应的pipeline
bootstrap.setPipelineFactory(() -> {
ChannelPipeline pipelines = Channels.pipeline();
pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
// support to maintain child socket channel.
pipelines.addLast(HandshakeInitializationHandler.class.getName(),
new HandshakeInitializationHandler(childGroups));
pipelines.addLast(ClientAuthenticationHandler.class.getName(),
new ClientAuthenticationHandler(canalAdmin));
SessionHandler sessionHandler = new SessionHandler(canalAdmin);
pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
return pipelines;
});
// 启动
if (StringUtils.isNotEmpty(ip)) {
this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));
} else {
this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));
}
}
SessionHandler#messageReceived
,用来处理admin发送过来的请求。
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
logger.info("message receives in session handler...");
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
try {
String action = null;
String message = null;
String destination = null;
switch (packet.getType()) {
case SERVER:
ServerAdmin serverAdmin = ServerAdmin.parseFrom(packet.getBody());
action = serverAdmin.getAction();
switch (action) {
case "check":
message = canalAdmin.check() ? "1" : "0";
break;
case "start":
message = canalAdmin.start() ? "1" : "0";
break;
case "stop":
message = canalAdmin.stop() ? "1" : "0";
break;
case "restart":
message = canalAdmin.restart() ? "1" : "0";
break;
case "list":
message = canalAdmin.getRunningInstances();
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("ServerAdmin action={} is unknown", action).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
break;
case INSTANCE:
InstanceAdmin instanceAdmin = InstanceAdmin.parseFrom(packet.getBody());
destination = instanceAdmin.getDestination();
action = instanceAdmin.getAction();
switch (action) {
case "check":
message = canalAdmin.checkInstance(destination) ? "1" : "0";
break;
case "start":
message = canalAdmin.startInstance(destination) ? "1" : "0";
break;
case "stop":
message = canalAdmin.stopInstance(destination) ? "1" : "0";
break;
case "release":
message = canalAdmin.releaseInstance(destination) ? "1" : "0";
break;
case "restart":
message = canalAdmin.restartInstance(destination) ? "1" : "0";
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("InstanceAdmin action={} is unknown", action).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
break;
case LOG:
LogAdmin logAdmin = LogAdmin.parseFrom(packet.getBody());
action = logAdmin.getAction();
destination = logAdmin.getDestination();
String type = logAdmin.getType();
String file = logAdmin.getFile();
int count = logAdmin.getCount();
switch (type) {
case "server":
if ("list".equalsIgnoreCase(action)) {
message = canalAdmin.listCanalLog();
} else {
message = canalAdmin.canalLog(count);
}
break;
case "instance":
if ("list".equalsIgnoreCase(action)) {
message = canalAdmin.listInstanceLog(destination);
} else {
message = canalAdmin.instanceLog(destination, file, count);
}
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("LogAdmin type={} is unknown", type).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(300,
MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
} catch (Throwable exception) {
byte[] errorBytes = AdminNettyUtils.errorPacket(400,
MessageFormatter.format("something goes wrong with channel:{}, exception={}",
ctx.getChannel(),
ExceptionUtils.getStackTrace(exception)).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
}
}
CanalMQStarter
CanalMQStarter#start
,核心就是初始化canalServer,CanalServerWithEmbedded.instance()
和执行CanalMQRunnable
,而该类的核心就是执行CanalMQStarter#worker
。
CanalMQStarter#worker
,就是执行canalServer.getWithoutAck
,从canal-server中获取消息,用canalMQProducer
将消息发送出去。
private void worker(String destination, AtomicBoolean destinationRunning) {
while (!running || !destinationRunning.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
logger.info("## start the MQ producer: {}.", destination);
MDC.put("destination", destination);
final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
while (running && destinationRunning.get()) {
try {
CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
if (canalInstance == null) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// ignore
}
continue;
}
MQDestination canalDestination = new MQDestination();
canalDestination.setCanalDestination(destination);
CanalMQConfig mqConfig = canalInstance.getMqConfig();
canalDestination.setTopic(mqConfig.getTopic());
canalDestination.setPartition(mqConfig.getPartition());
canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
canalDestination.setPartitionHash(mqConfig.getPartitionHash());
canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());
canalDestination.setEnableDynamicQueuePartition(mqConfig.getEnableDynamicQueuePartition());
canalServer.subscribe(clientIdentity);
logger.info("## the MQ producer: {} is running now ......", destination);
Integer getTimeout = mqProperties.getFetchTimeout();
Integer getBatchSize = mqProperties.getBatchSize();
while (running && destinationRunning.get()) {
Message message;
if (getTimeout != null && getTimeout > 0) {
message = canalServer.getWithoutAck(clientIdentity,
getBatchSize,
getTimeout.longValue(),
TimeUnit.MILLISECONDS);
} else {
message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
}
final long batchId = message.getId();
try {
int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
if (batchId != -1 && size != 0) {
canalMQProducer.send(canalDestination, message, new Callback() {
@Override
public void commit() {
canalServer.ack(clientIdentity, batchId); // 提交确认
}
@Override
public void rollback() {
canalServer.rollback(clientIdentity, batchId);
}
}); // 发送message到topic
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error("process error!", e);
}
}
}
MigrateMap
MigrateMap
内部使用了LoadingCache
,该类是guaua
的工具类,当获取不到指定的key的时候,调用CacheLoader
里面的实现方法。MigrateMap
做了一层简单的封装。
public class MigrateMap {
public static <K, V> ConcurrentMap<K, V> makeComputingMap(CacheBuilder<Object, Object> builder,
Function<? super K, ? extends V> computingFunction) {
final Function<? super K, ? extends V> function = computingFunction;
LoadingCache<K, V> computingCache = builder.build(new CacheLoader<K, V>() {
@Override
public V load(K key) throws Exception {
return function.apply(key);
}
});
return new MigrateConcurrentMap<>(computingCache);
}
public static <K, V> ConcurrentMap<K, V> makeComputingMap(Function<? super K, ? extends V> computingFunction) {
return makeComputingMap(CacheBuilder.newBuilder(), computingFunction);
}
final static class MigrateConcurrentMap<K, V> implements ConcurrentMap<K, V> {
private final LoadingCache<K, V> computingCache;
private final ConcurrentMap<K, V> cacheView;
MigrateConcurrentMap(LoadingCache<K, V> computingCache){
this.computingCache = computingCache;
this.cacheView = computingCache.asMap();
}
@Override
public int size() {
return cacheView.size();
}
}
}
3. CanalController
- 端口11111,是用来tcp通信的;端口11112,是用来提供
metrics
数据的。端口11110是用来和canal-admin通信的。 - 如果zk配置了的话,在zk的
/otter/canal/cluster
目录下根据ip:port创建server的临时节点,注册zk监听器 - 启动
embededCanalServer
- 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance。
- 如果是admin模式的话,使用
ManagerInstanceConfigMonitor
进行instance的启动
CanalController#start
。
public void start() throws Throwable {
logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
// 创建整个canal的工作节点
final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
initCid(path);
if (zkclientx != null) {
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
// 优先启动embeded服务
embededCanalServer.start();
// 尝试启动一下非lazy状态的通道
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 创建destination的工作节点
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
if (autoScan) {
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
}
}
if (autoScan) {
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
monitor.start();
}
}
}
// 启动网络接口
if (canalServer != null) {
canalServer.start();
}
}
CanalServerWithEmbedded
CanalServerWithEmbedded#loadCanalMetrics
,根据javaSPI获取CanalMetricsProvider
,执行PrometheusProvider#getService
的方法。
private void loadCanalMetrics() {
ServiceLoader<CanalMetricsProvider> providers = ServiceLoader.load(CanalMetricsProvider.class);
List<CanalMetricsProvider> list = new ArrayList<>();
for (CanalMetricsProvider provider : providers) {
list.add(provider);
}
if (list.isEmpty()) {
return;
}
// only allow ONE provider
if (list.size() > 1) {
logger.warn("Found more than one CanalMetricsProvider, use the first one.");
// 报告冲突
for (CanalMetricsProvider p : list) {
logger.warn("Found CanalMetricsProvider: {}.", p.getClass().getName());
}
}
CanalMetricsProvider provider = list.get(0);
this.metrics = provider.getService();
}
ServerRunningMonitors
ServerRunningMonitors
设置监听器。
ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {
public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
embededCanalServer.start(destination);
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
}
ServerRunningMonitor#start
,启动服务。
public synchronized void start() {
super.start();
try {
processStart();
if (zkClient != null) {
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
} else {
processActiveEnter();// 没有zk,直接启动
}
} catch (Exception e) {
logger.error("start failed", e);
// 没有正常启动,重置一下状态,避免干扰下一次start
stop();
}
}
private void processActiveEnter() {
if (listener != null) {
listener.processActiveEnter();
}
}
CanalServerWithEmbedded#start()
,启动CanalInstance
public void start(final String destination) {
final CanalInstance canalInstance = canalInstances.get(destination);
if (!canalInstance.isStart()) {
try {
MDC.put("destination", destination);
if (metrics.isRunning()) {
metrics.register(canalInstance);
}
canalInstance.start();
logger.info("start CanalInstances[{}] successfully", destination);
} finally {
MDC.remove("destination");
}
}
}
ManagerInstanceConfigMonitor
ManagerInstanceConfigMonitor#start
,采用定时器使用PlainCanalConfigClient
来获取canal-admin
的实例,通过defaultAction
进行本地的启动。
public void start() {
super.start();
executor.scheduleWithFixedDelay(() -> {
try {
scan();
if (isFirst) {
isFirst = false;
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
}
private void scan() {
String instances = configClient.findInstances(null);
if (instances == null) {
return;
}
final List<String> is = Lists.newArrayList(StringUtils.split(instances, ','));
List<String> start = new ArrayList<>();
List<String> stop = new ArrayList<>();
List<String> restart = new ArrayList<>();
for (String instance : is) {
if (!configs.containsKey(instance)) {
PlainCanal newPlainCanal = configClient.findInstance(instance, null);
if (newPlainCanal != null) {
configs.put(instance, newPlainCanal);
start.add(instance);
}
} else {
PlainCanal plainCanal = configs.get(instance);
PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());
if (newPlainCanal != null) {
// 配置有变化
restart.add(instance);
configs.put(instance, newPlainCanal);
}
}
}
configs.forEach((instance, plainCanal) -> {
if (!is.contains(instance)) {
stop.add(instance);
}
});
stop.forEach(instance -> {
notifyStop(instance);
});
restart.forEach(instance -> {
notifyReload(instance);
});
start.forEach(instance -> {
notifyStart(instance);
});
}
private void notifyStart(String destination) {
try {
defaultAction.start(destination);
actions.put(destination, defaultAction);
// 启动成功后记录配置文件信息
} catch (Throwable e) {
logger.error(String.format("scan add found[%s] but start failed", destination), e);
}
}
InstanceAction#start
主要是调用ServerRunningMonitor
来启动服务。
defaultAction = new InstanceAction() {
public void start(String destination) {
InstanceConfig config = instanceConfigs.get(destination);
if (config == null) {
// 重新读取一下instance config
config = parseInstanceConfig(properties, destination);
instanceConfigs.put(destination, config);
}
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
logger.info("auto notify start {} successful.", destination);
}
}
CanalInstanceWithSpring
CanalInstanceWithSpring#start
public void start() {
logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
super.start();
}
AbstractCanalInstance#start
,instance实例启动会开启metaManager
,alarmHandler
,eventStore
,eventSink
,eventParser
@Override
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();
}
if (!alarmHandler.isStart()) {
alarmHandler.start();
}
if (!eventStore.isStart()) {
eventStore.start();
}
if (!eventSink.isStart()) {
eventSink.start();
}
if (!eventParser.isStart()) {
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
logger.info("start successful....");
}
4. admin模块源码解析
admin的管理台需要手工录入NodeServer
的信息,配置已经搭建好的canal-deployer
的节点的ip和端口号,保持与canal-deployer
的信息流通。canal-deployer
的信息就不断的拉取canal-admin
上的配置信息,来达到配置的可视化。
该项目admin-web
是一个springBoot项目。启动入口在CanalAdminApplication
。
1. 查询NodeServer
NodeServerController#nodeServers
,
@GetMapping(value = "/nodeServers")
public BaseModel<Pager<NodeServer>> nodeServers(NodeServer nodeServer, Pager<NodeServer> pager,
@PathVariable String env) {
return BaseModel.getInstance(nodeServerService.findList(nodeServer, pager));
}
NodeServerServiceImpl#findList
,获取server列表。查询数据库表canal_node_server
。调用SimpleAdminConnector#doServerAdmin
来判断是否启动成功。
public Pager<NodeServer> findList(NodeServer nodeServer, Pager<NodeServer> pager) {
Query<NodeServer> query = getBaseQuery(nodeServer);
Query<NodeServer> queryCnt = query.copy();
int count = queryCnt.findCount();
pager.setCount((long) count);
List<NodeServer> nodeServers = query.order()
.asc("id")
.setFirstRow(pager.getOffset().intValue())
.setMaxRows(pager.getSize())
.findList();
pager.setItems(nodeServers);
if (nodeServers.isEmpty()) {
return pager;
}
List<Future<Boolean>> futures = new ArrayList<>(nodeServers.size());
// get all nodes status
for (NodeServer ns : nodeServers) {
futures.add(Threads.executorService.submit(() -> {
boolean status = SimpleAdminConnectors.execute(ns.getIp(), ns.getAdminPort(), AdminConnector::check);
ns.setStatus(status ? "1" : "0");
return !status;
}));
}
for (Future<Boolean> f : futures) {
try {
f.get(3, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
// ignore
} catch (TimeoutException e) {
break;
}
}
return pager;
}
2. 保存NodeServer
NodeServerController#save
,保存NodeServer
@PostMapping(value = "/nodeServer")
public BaseModel<String> save(@RequestBody NodeServer nodeServer, @PathVariable String env) {
nodeServerService.save(nodeServer);
return BaseModel.getInstance("success");
}
NodeServerServiceImpl#save
,判断是否已经存在相同的节点。
public void save(NodeServer nodeServer) {
int cnt = NodeServer.find.query()
.where()
.eq("ip", nodeServer.getIp())
.eq("adminPort", nodeServer.getAdminPort())
.findCount();
if (cnt > 0) {
throw new ServiceException("节点信息已存在");
}
nodeServer.save();
if (nodeServer.getClusterId() == null) { // 单机模式
CanalConfig canalConfig = new CanalConfig();
canalConfig.setServerId(nodeServer.getId());
String configTmp = TemplateConfigLoader.loadCanalConfig();
canalConfig.setContent(configTmp);
try {
String contentMd5 = SecurityUtil.md5String(canalConfig.getContent());
canalConfig.setContentMd5(contentMd5);
} catch (NoSuchAlgorithmException e) {
}
canalConfig.save();
}
}
参考博文:
- https://blog.csdn.net/u014730658/article/details/107144812