背景
系统启动时,会注意sharding-jdbc提示加载metadata
于是想看看里面做了什么事情
问题追踪
debug后可以观察走到了该类
org.apache.shardingsphere.shardingjdbc.jdbc.core.context.ShardingRuntimeContext#loadSchemaMetaData |
先看这个shardingRuntimeContext相关的类关系,对于多数据源的场景,会定义一个加载元数据的方法供子类实现,主要关注loadSchemaMetaData方法
重点关注ShardingRuntimeContext里的逻辑
对于分库的数据库加载,可以看到有两个问题
1.对于分库的表元数据加载,是单线程执行的,即使把max.connections.size.per.query调大,也不会有效率提升
2.某些规则的元数据与分片表的元数据是一致的,存在重复加载情况
问题解决
分表配置中,某些规则的acutal_data_nodes是一样的,根据该字段分组,对于分表规则相同的配置,可以加载一份即可
在本项目中复写对应的类增强下对应的load方法,新增了groupBy加载的配置支持
//根据配置选择原生加载还是groupBy加载
public SchemaMetaData load(final DatabaseType databaseType) throws SQLException {
SchemaMetaData result = useGroupMetaLoad?loadShardingSchemaMetaDataGroupByActualDataNodes(databaseType):loadShardingSchemaMetaData(databaseType);
result.merge(loadDefaultSchemaMetaData(databaseType));
return result;
}
//原有逻辑
private SchemaMetaData loadShardingSchemaMetaData(final DatabaseType databaseType) throws SQLException {
LOGGER.info("Loading {} logic tables' meta data.", shardingRule.getTableRules().size());
Map<String, TableMetaData> tableMetaDataMap = new HashMap<>(shardingRule.getTableRules().size(), 1);
for (TableRule each : shardingRule.getTableRules()) {
tableMetaDataMap.put(each.getLogicTable(), load(each.getLogicTable(), databaseType));
}
return new SchemaMetaData(tableMetaDataMap);
}
//新逻辑
private SchemaMetaData loadShardingSchemaMetaDataGroupByActualDataNodes(final DatabaseType databaseType) throws SQLException {
LOGGER.info("Loading {} logic tables' meta data.", shardingRule.getTableRules().size());
//根据actualDataNodes分组,key为要查询的逻辑表名,value为同个acutalDataNodes的表名
Map<String, Set<String>> sameNodeMap = groupByActualDataNodes(shardingRule.getShardingDataSourceNames().getShardingRuleConfig().getTableRuleConfigs());
int maxQuery=Math.min(CORES*2,maxConnectionsSizePerQuery);
List<List<String>> tableGroups = Lists.partition(new ArrayList<>(sameNodeMap.keySet()), Math.max(sameNodeMap.size() / maxQuery, 1));
Map<String, TableMetaData> tableMetaDataMap = new HashMap<>(shardingRule.getTableRules().size(), 1);
if (tableGroups.size() == 1 || isCheckingMetaData) {
for (String sameLogicTable : tableGroups.get(0)) {
TableMetaData load = load(sameLogicTable, databaseType);
Set<String> value = sameNodeMap.get(sameLogicTable);
for (String s : value) {
tableMetaDataMap.put(s, load);
}
}
} else {
//async,模仿已有写法
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(tableGroups.size(), maxQuery));
try {
Collection<Future<Map<String, TableMetaData>>> futures = new LinkedList<>();
for (List<String> each : tableGroups) {
futures.add(executorService.submit(() -> {
Map<String, TableMetaData> tableMetaData = new HashMap<>();
for (String s : each) {
tableMetaData.put(s, load(s, databaseType));
}
return tableMetaData;
}));
}
for (Future<Map<String, TableMetaData>> each : futures) {
try {
Map<String, TableMetaData> m = each.get();
for (String s : m.keySet()) {
Set<String> sameTable = sameNodeMap.get(s);
for (String string : sameTable) {
tableMetaDataMap.put(string, m.get(s));
}
}
} catch (final InterruptedException | ExecutionException ex) {
if (ex.getCause() instanceof SQLException) {
throw (SQLException) ex.getCause();
}
Thread.currentThread().interrupt();
}
}
} finally {
executorService.shutdown();
}
}
LOGGER.info("Actual {} logic tables' meta data loaded.", sameNodeMap.size());
return new SchemaMetaData(tableMetaDataMap);
}
private Map<String, Set<String>> groupByActualDataNodes(Collection<TableRuleConfiguration> configurations) {
Map<String, List<TableRuleConfiguration>> collect = configurations.stream().collect(Collectors.groupingBy(TableRuleConfiguration::getActualDataNodes));
Map<String, Set<String>> containSet = new HashMap<>();
for (Entry<String, List<TableRuleConfiguration>> entry : collect.entrySet()) {
//同一个datasource分为一组
Map<String,Set<String>> sameDatabaseMap=new HashMap<>();
entry.getValue().stream().map(TableRuleConfiguration::getLogicTable).forEach(e->{
String currentDatabase = getFirstDataSourceNameByLogicTableName(e);
Set<String> tableNames = sameDatabaseMap.getOrDefault(currentDatabase, new HashSet<>());
tableNames.add(e);
sameDatabaseMap.put(currentDatabase,tableNames);
});
for (Entry<String, Set<String>> sameDatabaseSet : sameDatabaseMap.entrySet()) {
Set<String> value = sameDatabaseSet.getValue();
containSet.put(value.iterator().next(),value);
}
}
return containSet;
}
private String getFirstDataSourceNameByLogicTableName(String logicTableName){
TableRule tableRule = shardingRule.getTableRule(logicTableName);
DataNode dataNode = tableRule.getActualDataNodes().iterator().next();
return dataNode.getDataSourceName();
}
这样就达到了优化的效果
并添加配置
spring.shardingsphere.props.max.connections.size.per.query=10
spring.shardingsphere.props.group.metadata.load.enabled=true
运行结果如下,减少重复的表元数据加载,并采用异步加载,启动速度快了一些
配置修改后的关注问题
可以关注到对于max.connections.size.per.query,我们从1→10,有什么需要注意的呢?
1.createConnections
我们可以先看一下一次查询的执行过程
1.org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
这里主要解析sql,做路由&生成需要执行的单元信息,生成执行上下文
2.接下来是重点关注的逻辑
根据执行上下文初始化执行器
我们需要关注的是生成执行单元中获取链接的逻辑
具体在 org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#obtainExecuteGroups
一路debug会走到
我们需要关注的是生成执行单元中获取链接的逻辑
具体在 org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#obtainExecuteGroups
一路debug会走到
在这里可以看到该配置的影响,他会根据实际要执行的sql单元,确认链接模式,关于链接模式的进一步说明,可以参考官网说明
链接模式 | 说明 |
---|---|
MEMORY_STRICTLY | 内存限制模式,当对单个物理库查询时,链接数足够时会采用此模式,单个库有多个链接 |
CONNECTION_STRICTLY | 链接限制模式,对单个物理库查询时,只有0或1个链接可用 |
这两种限制模式在创建链接时有差异
链接限制模式直接创建,对于内存限制模式,需要一次性获取执行链接
主要影响是单个逻辑sql有多个分片sql执行场景时所需要的连接数,由于我们使用的druid链接池,获取链接的方法为com.alibaba.druid.pool.DruidDataSource#getConnectionDirect
对于配了超时(druid.maxWait),如果maxActive<max.connections.size.per.query,会报无法获取足够链接的错误
对于没配超时,会出现一直等待的场景com.alibaba.druid.pool.DruidDataSource#takeLast,造成假死
重点保证链接池的maxActive>=max.connections.size.per.query即可
2.executeQuery
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult
内存限制模式,会把链接hold住,相当于游标方式获取结果,对于链接限制模式,会把数据加载到内存,链接可以释放
其他
1.本地启动时也可以配置druid链接池进行异步初始化,加快启动速度
spring.datasource.druid.async-init=true
2.在sharding-jdbc执行过程中,会发现如下逻辑
会抽取第一个任务给主线程执行,其他交由线程池异步处理
减少主线程的空等待时间,我们一些异步多任务的执行也可以参考一下该做法