CanalLauncher类是canal server端启动的入口类,跟随代码进行深入。
在开始之前,我们可以先了解下,
canal 配置方式
- ManagerCanalInstanceGenerator: 基于manager管理的配置方式,实时感知配置并进行server重启
- SpringCanalInstanceGenerator:基于本地spring xml的配置方式,对于多instance的时候,不便于扩展,一个instance一个xml配置
canal 配置文件
- canal.properties (系统根配置文件)
- instance.properties (instance级别的配置文件,每个instance一份)
入口代码
public static void main(String[] args) {
try {
logger.info("## set default uncaught exception handler");
// 设置全局的异常捕获
setGlobalUncaughtExceptionHandler();
// 支持rocketmq client 配置日志路径
System.setProperty("rocketmq.client.logUseSlf4j","true");
// 加载canal 配置
logger.info("## load canal configurations");
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
// 初始化启动类
final CanalStarter canalStater = new CanalStarter(properties);
// canal.admin.manager 配置地址
String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
if (StringUtils.isNotEmpty(managerAddress)) {
// 获取admin 用户名和密码
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
if (StringUtils.isEmpty(passwd)) {
throw new IllegalArgumentException(
"canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");
}
// 设置默认端口号
String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
// 自否自动注册
boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
//集群地址
String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
//注册名称
String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
if (StringUtils.isEmpty(name)) {
// 以本地机器为默认名字
name = AddressUtils.getHostName();
}
//注册到zk 或者admin中server IP 不配置就是本地IP
String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
if (StringUtils.isEmpty(registerIp)) {
registerIp = AddressUtils.getHostIp();
}
// 初始化配置客户端
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort),
autoRegister,
autoCluster,
name);
// 通过http方式加载远程配置 通过admin配置的信息
PlainCanal canalConfig = configClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
+ " can't not found config for [" + registerIp + ":" + adminPort
+ "]");
}
Properties managerProperties = canalConfig.getProperties();
// merge local 本地配置优先级更高
managerProperties.putAll(properties);
// auto.scan.interval instance自动扫描的间隔时间,单位秒
int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
executor.scheduleWithFixedDelay(new Runnable() {
private PlainCanal lastCanalConfig;
// 定时异步线程去加载远程配置
public void run() {
try {
if (lastCanalConfig == null) {
lastCanalConfig = configClient.findServer(null);
} else {
//通过md5的方式验证重新验证配置是否有变更,有变更返回最新数据,并重新加载
PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
if (newCanalConfig != null) {
// 远程配置canal.properties修改重新加载整个应用
canalStater.stop();
Properties managerProperties = newCanalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
canalStater.setProperties(managerProperties);
canalStater.start();
lastCanalConfig = newCanalConfig;
}
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
canalStater.setProperties(managerProperties);
} else {
canalStater.setProperties(properties);
}
canalStater.start();
runningLatch.await();
executor.shutdownNow();
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}
}
上面代码中,对于配置的加载有两部分,当配置了canal.admin.manager 后台管理地址,会先去验证配置的用户名和密码,去获取相应的server配置(通过http的方式进行获取)
public PlainCanal findServer(String md5) {
if (StringUtils.isEmpty(md5)) {
md5 = "";
}
String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5
+ "®ister=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);
return queryConfig(url);
}
当通过admin的方式进行加载配置,会每隔默认5s通过http的方式获取admin端的配置,会对整体配置进行md5值进行比较,有变更会对server进行重启。
跟随CanalStarter.start() 方法进入到启动方法,在CanalLauncher中已加载到server相关的配置。
1. 根据配置canal.serverMode获取服务端模式,有tcp、相关的mq( kafka, rocketMQ, rabbitMQ, pulsarMQ)方式,除了tcp模式,需要初始化mq相关配置
// 服务端模式 可通过配置进行变更 canal.serverMode
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (!"tcp".equalsIgnoreCase(serverMode)) {
ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
// /plugin 和 /canal/plugin"
canalMQProducer = loader
.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
if (canalMQProducer != null) {
canalMQProducer = new ProxyCanalMQProducer(canalMQProducer);
// 初始化mq信息 由canal.serverMode 决定使用哪种mq
canalMQProducer.init(properties);
}
}
if (canalMQProducer != null) {
MQProperties mqProperties = canalMQProducer.getMqProperties();
// disable netty
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
if (mqProperties.isFlatMessage()) {
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
}
2. 初始化canal调度控制器CanalController,并进行start,CanalController主要是初始化各个instance配置
3. 非tcp的方式,进行启动mq。
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
// 当前server上部署的instance列表
String destinations = CanalController.getDestinations(properties);
// 启动mq
canalMQStarter.start(destinations);
controller.setCanalMQStarter(canalMQStarter);
}
4. 初始化CanalAdminController:提供canal admin的管理操作
// start canalAdmin
String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
port,
user,
passwd,
ip);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
canalAdminWithNetty.setPort(Integer.parseInt(port));
canalAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
5. jvm停止时,shutdown监听admin配置线程池