背景
在同步MySQL数据到ES的场景中,选择了canal组件同步数据。
问题描述
在同步的时候发现canal-adapter中canal-adapter/conf/es7/product.yml 配置文件中sql 语句连表查询的时候会出现无法更新Elasticsearch 中数据的情况,而且日志没有提示异常(idea启动的时候有错误日志),令人百思不得其解。
问题分析
初步估计是内部解析yml的时候出错了,但具体是什么原因只能看源码调试了。
下载源码
GitHub地址
IDEA导入代码
项目结构图如下:
配置文件: application.yml (## 备注代表需要注意和修改的地方)
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
## 模式
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
## kafka地址,ip用内网(容器)ip
kafka.bootstrap.servers: 192.168.0.107:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
## 数据库配置
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.0.107:3306/test?useUnicode=true
username: root
password: root
canalAdapters:
## Kafka主题名
- instance: canal_manager # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
## Elasticsearch 配置,canal 1.5后name:es7(用es可能会有问题)
- name: es7
hosts: http://192.168.0.107:9200 # 127.0.0.1:9200 for rest mode
properties:
## 模式rest
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
复制代码
customer.ym (## 备注代表需要注意和修改的地方)
dataSourceKey: defaultDS
## Kafka主题
destination: canal_manager
groupId: g1
esMapping:
## Elasticsearch 索引
_index: product
## 主键
_id: _id
_type: _doc
upsert: true
# relations:
# customer_order:
# name: customer
## 正确的sql
#sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, r.Pro_No proNo, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
## 出错的sql
sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
etlCondition: "where p.c_time>={}"
commitBatch: 3000
复制代码
启动入口:CanalAdapterApplication 全量同步接口类:CommonRest 请求示例:
// post
http://127.0.0.1:8081/etl/es7/customer.yml
复制代码
查找问题
启动程序后,发现打印了错误日志。
2022-03-18 09:10:56.537 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2022-03-18 09:10:56.742 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
2022-03-18 09:10:56.874 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2022-03-18 09:11:00.028 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [classes/:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [classes/:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.__refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.jrLockAndRefresh(AbstractApplicationContext.java:40002) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:41008) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[classes/:na]
Caused by: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
... 44 common frames omitted
Caused by: com.alibaba.druid.sql.parser.ParserException: null
at com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser.parse(SqlParser.java:71) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:143) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
... 45 common frames omitted
2022-03-18 09:11:00.046 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /Users/Desktop/workspace/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin
2022-03-18 09:11:00.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: canal_manager-g1 succeed
2022-03-18 09:11:00.101 [Thread-35] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: canal_manager <=============
2022-03-18 09:11:00.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2022-03-18 09:11:00.113 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2022-03-18 09:11:00.126 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-03-18 09:11:00.128 [Thread-35] INFO o
复制代码
查看日志异常好像是sql解析异常:java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException,但是sql放到Navicat或其他工具中都可以正常执行,可能是canal内部有自己的解析规则。
根据错误日志断点调试一下,发现具体的问题在ES7xAdapter 适配器初始化的时候出错了。异常信息:“关系条件”列必须位于“选择”列中(Relation condition column must in select columns.)。
// ES同步指定sql格式解析 SqlParser.java
/**
* 解析sql
*
* @param sql sql
* @return 视图对象
*/
public static SchemaItem parse(String sql) {
try {
SQLStatementParser parser = new MySqlStatementParser(sql);
SQLSelectStatement statement = (SQLSelectStatement) parser.parseStatement();
MySqlSelectQueryBlock sqlSelectQueryBlock = (MySqlSelectQueryBlock) statement.getSelect().getQuery();
SchemaItem schemaItem = new SchemaItem();
schemaItem.setSql(SQLUtils.toMySqlString(sqlSelectQueryBlock));
SQLTableSource sqlTableSource = sqlSelectQueryBlock.getFrom();
List<TableItem> tableItems = new ArrayList<>();
SqlParser.visitSelectTable(schemaItem, sqlTableSource, tableItems, null);
tableItems.forEach(tableItem -> schemaItem.getAliasTableItems().put(tableItem.getAlias(), tableItem));
List<FieldItem> fieldItems = collectSelectQueryFields(sqlSelectQueryBlock);
fieldItems.forEach(fieldItem -> schemaItem.getSelectFields().put(fieldItem.getFieldName(), fieldItem));
schemaItem.init();
if (schemaItem.getAliasTableItems().isEmpty() || schemaItem.getSelectFields().isEmpty()) {
throw new ParserException("Parse sql error");
}
return schemaItem;
} catch (Exception e) {
throw new ParserException();
}
}
复制代码
解决方案
根据异常提示和测试,我们知道canal的配置文件sql格式要求连表查询的时候,必须将关联条件也查出来,就是说如果表A和表B的关联字段都要查出来。
// 正确的sql,要将关联的两个表主键都查出来
select a.id, b.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id
// 错误的sql
select a.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id
复制代码
总结
有些问题还是需要源码才能发现的,就像这个情况,日志只提示了sql解析异常,但是看起来又没有问题,只能去看代码逻辑调试,才能发现根本的原因。个人经验总结,如果有不对的地方,请大家指正