【源码解析】分库分表框架 Shardingsphere 源码解析

news2024/11/22 14:58:44

前言

以前研究过如何使用ShardingJdbc,使用ShardingJdbc进行分库分表,但是原理方面没有细致的深入了解。如果仅仅了解如何使用的话,对于改造和排查问题,其实都是不够的,所以跟踪源码了解其运行原理是很重要的。

Demo

依赖

        <!-- 分库分表  -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.1.2</version>
        </dependency>

配置文件

spring:
  shardingsphere:
    mode:
      type: Memory                                         # 内存模式,元数据保存在当前进程中
    datasource:
      names: test0,test1                                   # 数据源名称,这里有两个
      test0:                                               # 跟上面的数据源对应
        type: com.alibaba.druid.pool.DruidDataSource       # 连接池
        url: jdbc:mysql://127.0.0.1:3306/test0             # 连接url
        username: root
        password: root
      test1:                                               # 跟上面的数据源对应
        type: com.alibaba.druid.pool.DruidDataSource
        url: jdbc:mysql://127.0.0.1:3306/test1
        username: root
        password: root
    rules:
      sharding:
        tables:
          user:                                            # 这个可以随便取,问题不大
            actual-data-nodes: test$->{0..1}.user$->{0..2} # 实际节点名称,格式为 库名$->{0..n1}.表名$->{0..n2}
              # 其中n1、n2分别为库数量-1和表数量-1
            # 也可以使用${0..n1}的形式,但是会与Spring属性文件占位符冲突
            # 所以使用$->{0..n1}的形式

            database-strategy:                             # 分库策略
              standard:                                    # 标准分库策略
                sharding-column: age                       # 分库列名
                sharding-algorithm-name: age-mod           # 分库算法名字
            table-strategy:                                # 分表策略
              standard:                                    # 标准分表策略
                sharding-column: id                        # 分表列名
                sharding-algorithm-name: id-mod            # 分表算法名字
        sharding-algorithms:                               # 配置分库和分表的算法
          age-mod:                                         # 分库算法名字
            type: MOD                                      # 算法类型为取模
            props:                                         # 算法配置的键名,所有算法配置都需要在props下
              sharding-count: 2                            # 分片数量
          id-mod:                                          # 分表算法名字
            type: MOD                                      # 算法类型为取模
            props:                                         # 算法配置的键名,所有算法配置都需要在props下
              sharding-count: 3                            # 分片数量
    props:
      sql-show: true                                       # 打印SQL

创表语句

-- auto-generated definition
create table user
(
    id   bigint       not null
        primary key,
    name varchar(200) null,
    age  int          null
);

实体类

@Data
@Builder
public class User {
    
    @TableId(type = IdType.ASSIGN_ID)
    private Long id;
    private String name;
    private Integer age;
}

Mapper

public interface UserMapper extends BaseMapper<User> {

}

控制器

@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserMapper userMapper;

    @GetMapping("/insert")
    public boolean insert() {
        userMapper.insert(User.builder().name("name").age(new Random().nextInt(100) + 1).build());
        return true;
    }

    @GetMapping("/select")
    public List<User> select() {
        return userMapper.selectList(new QueryWrapper<>());
    }
}

源码解析

解析

PreparedStatementHandler#instantiateStatement,创建Statement会用到sql解析。

    protected Statement instantiateStatement(Connection connection) throws SQLException {
        String sql = this.boundSql.getSql();
        if (this.mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
            String[] keyColumnNames = this.mappedStatement.getKeyColumns();
            return keyColumnNames == null ? connection.prepareStatement(sql, 1) : connection.prepareStatement(sql, keyColumnNames);
        } else {
            return this.mappedStatement.getResultSetType() == ResultSetType.DEFAULT ? connection.prepareStatement(sql) : connection.prepareStatement(sql, this.mappedStatement.getResultSetType().getValue(), 1007);
        }
    }

ShardingSpherePreparedStatement

    public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
    }

    private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            SQLExceptionErrorCode errorCode = SQLExceptionErrorCode.SQL_STRING_NULL_OR_EMPTY;
            throw new SQLException(errorCode.getErrorMessage(), errorCode.getSqlState(), errorCode.getErrorCode());
        }
        this.connection = connection;
        metaDataContexts = connection.getContextManager().getMetaDataContexts();
        this.sql = sql;
        statements = new ArrayList<>();
        parameterSets = new ArrayList<>();
        Optional<SQLParserRule> sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().findSingleRule(SQLParserRule.class);
        Preconditions.checkState(sqlParserRule.isPresent());
        ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.get().getSQLParserEngine(
                DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabases().get(connection.getDatabaseName()).getResource().getDatabaseType()));
        sqlStatement = sqlParserEngine.parse(sql, true);
        sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(), sqlStatement, connection.getDatabaseName());
        parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
        statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
        executor = new DriverExecutor(connection);
        JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.isHoldTransaction());
        batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getDatabaseName());
        kernelProcessor = new KernelProcessor();
        statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabases().get(connection.getDatabaseName()).getRuleMetaData().getConfigurations());
        trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().findSingleRule(TrafficRule.class).orElse(null);
        statementManager = new StatementManager();
    }
    

SQLStatementParserEngine#parse,判断是否有缓存。

    public SQLStatement parse(String sql, boolean useCache) {
        return useCache ? (SQLStatement)this.sqlStatementCache.get(sql) : this.sqlStatementParserExecutor.parse(sql);
    }

SQLParserExecutor#parse,解析sql,处理成多个片段。

    public ParseASTNode parse(String sql) {
        ParseASTNode result = this.twoPhaseParse(sql);
        if (result.getRootNode() instanceof ErrorNode) {
            throw new SQLParsingException("Unsupported SQL of `%s`", new Object[]{sql});
        } else {
            return result;
        }
    }

路由

PreparedStatementHandler#query,Mybatis执行查询获取到的PreparedStatementShardingSpherePreparedStatement

	public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
        PreparedStatement ps = (PreparedStatement)statement;
        ps.execute();
        return this.resultSetHandler.handleResultSets(ps);
    }

ShardingSpherePreparedStatement#execute,获取逻辑SQL,调试获取到的sql是INSERT INTO user ( id,name,age ) VALUES ( ?,?,? )

    @Override
    public boolean execute() throws SQLException {
        try {
            if (statementsCacheable && !statements.isEmpty()) {
                resetParameters();
                return statements.iterator().next().execute();
            }
            clearPrevious();
            LogicSQL logicSQL = createLogicSQL();
            trafficContext = getTrafficContext(logicSQL);
            if (trafficContext.isMatchTraffic()) {
                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext, logicSQL);
                return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute());
            }
            executionContext = createExecutionContext(logicSQL);
            if (hasRawExecutionRule()) {
                // TODO process getStatement
                Collection<ExecuteResult> executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getLogicSQL(), new RawSQLExecutorCallback());
                return executeResults.iterator().next() instanceof QueryResult;
            }
            if (executionContext.getRouteContext().isFederated()) {
                ResultSet resultSet = executeFederationQuery(logicSQL);
                return null != resultSet;
            }
            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
            cacheStatements(executionGroupContext.getInputGroups());
            return executor.getRegularExecutor().execute(executionGroupContext,
                    executionContext.getLogicSQL(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
        } catch (SQLException ex) {
            handleExceptionInTransaction(connection, metaDataContexts);
            throw ex;
        } finally {
            clearBatch();
        }
    }

KernelProcessor#generateExecutionContext,处理sql主要是进行路由,改写

    public ExecutionContext generateExecutionContext(LogicSQL logicSQL, ShardingSphereDatabase database, ConfigurationProperties props) {
        RouteContext routeContext = this.route(logicSQL, database, props);
        SQLRewriteResult rewriteResult = this.rewrite(logicSQL, database, props, routeContext);
        ExecutionContext result = this.createExecutionContext(logicSQL, database, routeContext, rewriteResult);
        this.logSQL(logicSQL, props, result);
        return result;
    }

SQLRouteEngine#routeSQLRouteEngine调用SQLRouteExecutor进行路由,获取到的是PartialSQLRouteExecutor

    public RouteContext route(final LogicSQL logicSQL, final ShardingSphereDatabase database) {
        SQLRouteExecutor executor = isNeedAllSchemas(logicSQL.getSqlStatementContext().getSqlStatement()) ? new AllSQLRouteExecutor() : new PartialSQLRouteExecutor(rules, props);
        return executor.route(logicSQL, database);
    }

PartialSQLRouteExecutor,根据SQLRouterFactory获取routers

    public PartialSQLRouteExecutor(final Collection<ShardingSphereRule> rules, final ConfigurationProperties props) {
        this.props = props;
        routers = SQLRouterFactory.getInstances(rules);
    }

SQLRouterFactory#getInstances,通过SPI的方式获取到SQLRouter

    public static Map<ShardingSphereRule, SQLRouter> getInstances(final Collection<ShardingSphereRule> rules) {
        return OrderedSPIRegistry.getRegisteredServices(SQLRouter.class, rules);
    }

PartialSQLRouteExecutor#route,根据ShardingSphereRuleSQLRouter获取结果。

    public RouteContext route(final LogicSQL logicSQL, final ShardingSphereDatabase database) {
        RouteContext result = new RouteContext();
        Optional<String> dataSourceName = findDataSourceByHint(logicSQL.getSqlStatementContext(), database.getResource().getDataSources());
        if (dataSourceName.isPresent()) {
            result.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName.get(), dataSourceName.get()), Collections.emptyList()));
            return result;
        }
        for (Entry<ShardingSphereRule, SQLRouter> entry : routers.entrySet()) {
            if (result.getRouteUnits().isEmpty()) {
                result = entry.getValue().createRouteContext(logicSQL, database, entry.getKey(), props);
            } else {
                entry.getValue().decorateRouteContext(result, logicSQL, database, entry.getKey(), props);
            }
        }
        if (result.getRouteUnits().isEmpty() && 1 == database.getResource().getDataSources().size()) {
            String singleDataSourceName = database.getResource().getDataSources().keySet().iterator().next();
            result.getRouteUnits().add(new RouteUnit(new RouteMapper(singleDataSourceName, singleDataSourceName), Collections.emptyList()));
        }
        return result;
    }

ShardingSQLRouter#createRouteContext,根据对应的分库分表规则计算结果。

    public RouteContext createRouteContext(LogicSQL logicSQL, ShardingSphereDatabase database, ShardingRule rule, ConfigurationProperties props) {
        SQLStatement sqlStatement = logicSQL.getSqlStatementContext().getSqlStatement();
        ShardingConditions shardingConditions = this.createShardingConditions(logicSQL, database, rule);
        Optional<ShardingStatementValidator> validator = ShardingStatementValidatorFactory.newInstance(sqlStatement, shardingConditions);
        validator.ifPresent((optional) -> {
            optional.preValidate(rule, logicSQL.getSqlStatementContext(), logicSQL.getParameters(), database);
        });
        if (sqlStatement instanceof DMLStatement && shardingConditions.isNeedMerge()) {
            shardingConditions.merge();
        }

        RouteContext result = ShardingRouteEngineFactory.newInstance(rule, database, logicSQL.getSqlStatementContext(), shardingConditions, props).route(rule);
        validator.ifPresent((optional) -> {
            optional.postValidate(rule, logicSQL.getSqlStatementContext(), logicSQL.getParameters(), database, props, result);
        });
        return result;
    }

ShardingStandardRoutingEngine#route,获取到DataNode的集合。

    public RouteContext route(ShardingRule shardingRule) {
        RouteContext result = new RouteContext();
        Collection<DataNode> dataNodes = this.getDataNodes(shardingRule, shardingRule.getTableRule(this.logicTableName));
        result.getOriginalDataNodes().addAll(this.originalDataNodes);
        Iterator var4 = dataNodes.iterator();

        while(var4.hasNext()) {
            DataNode each = (DataNode)var4.next();
            result.getRouteUnits().add(new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singleton(new RouteMapper(this.logicTableName, each.getTableName()))));
        }

        return result;
    }

ShardingStandardRoutingEngine#getDataNodes,根据shardingCondition的数据进行路由,根据分库策略获取库名,再根据库名和分表策略获取表名。

    private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
        ShardingStrategy databaseShardingStrategy = createShardingStrategy(shardingRule.getDatabaseShardingStrategyConfiguration(tableRule),
                shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
        ShardingStrategy tableShardingStrategy = createShardingStrategy(shardingRule.getTableShardingStrategyConfiguration(tableRule),
                shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
        if (isRoutingByHint(shardingRule, tableRule)) {
            return routeByHint(tableRule, databaseShardingStrategy, tableShardingStrategy);
        }
        if (isRoutingByShardingConditions(shardingRule, tableRule)) {
            return routeByShardingConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
        }
        return routeByMixedConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
    }

    private Collection<DataNode> routeByShardingConditions(final ShardingRule shardingRule, final TableRule tableRule,
                                                           final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        return shardingConditions.getConditions().isEmpty()
                ? route0(tableRule, databaseShardingStrategy, Collections.emptyList(), tableShardingStrategy, Collections.emptyList())
                : routeByShardingConditionsWithCondition(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
    }

    private Collection<DataNode> routeByShardingConditionsWithCondition(final ShardingRule shardingRule, final TableRule tableRule,
                                                                        final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        Collection<DataNode> result = new LinkedList<>();
        for (ShardingCondition each : shardingConditions.getConditions()) {
            Collection<DataNode> dataNodes = route0(tableRule,
                    databaseShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, databaseShardingStrategy.getShardingColumns(), each),
                    tableShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), each));
            result.addAll(dataNodes);
            originalDataNodes.add(dataNodes);
        }
        return result;
    }

    private Collection<DataNode> route0(final TableRule tableRule,
                                        final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues,
                                        final ShardingStrategy tableShardingStrategy, final List<ShardingConditionValue> tableShardingValues) {
        Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingStrategy, databaseShardingValues);
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedDataSources) {
            result.addAll(routeTables(tableRule, each, tableShardingStrategy, tableShardingValues));
        }
        return result;
    }

ShardingStandardRoutingEngine#routeDataSources

    private Collection<String> routeDataSources(final TableRule tableRule, final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues) {
        if (databaseShardingValues.isEmpty()) {
            return tableRule.getActualDatasourceNames();
        }
        Collection<String> result = databaseShardingStrategy.doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, tableRule.getDataSourceDataNode(), properties);
        Preconditions.checkState(!result.isEmpty(), "No database route info");
        Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result),
                "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
        return result;
    }

StandardShardingStrategy#doSharding()

    @Override
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<ShardingConditionValue> shardingConditionValues,
                                         final DataNodeInfo dataNodeInfo, final ConfigurationProperties props) {
        ShardingConditionValue shardingConditionValue = shardingConditionValues.iterator().next();
        Collection<String> shardingResult = shardingConditionValue instanceof ListShardingConditionValue
                ? doSharding(availableTargetNames, (ListShardingConditionValue) shardingConditionValue, dataNodeInfo)
                : doSharding(availableTargetNames, (RangeShardingConditionValue) shardingConditionValue, dataNodeInfo);
        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        result.addAll(shardingResult);
        return result;
    }

    private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListShardingConditionValue<?> shardingValue, final DataNodeInfo dataNodeInfo) {
        Collection<String> result = new LinkedList<>();
        for (Comparable<?> each : shardingValue.getValues()) {
            String target = shardingAlgorithm.doSharding(availableTargetNames,
                    new PreciseShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), dataNodeInfo, each));
            if (null != target && availableTargetNames.contains(target)) {
                result.add(target);
            } else if (null != target && !availableTargetNames.contains(target)) {
                throw new ShardingSphereException(String.format("Route table %s does not exist, available actual table: %s", target, availableTargetNames));
            }
        }
        return result;
    }

ModShardingAlgorithm#doSharding(),如果是取余的算法就是获取后缀值。

    @Override
    public String doSharding(final Collection<String> availableTargetNames, final PreciseShardingValue<Comparable<?>> shardingValue) {
        String shardingResultSuffix = getShardingResultSuffix(cutShardingValue(shardingValue.getValue()).mod(new BigInteger(String.valueOf(shardingCount))).toString());
        return findMatchedTargetName(availableTargetNames, shardingResultSuffix, shardingValue.getDataNodeInfo()).orElse(null);
    }

重写

KernelProcessor#rewrite,调用SQLRewriteEntry重写Sql。

    private SQLRewriteResult rewrite(LogicSQL logicSQL, ShardingSphereDatabase database, ConfigurationProperties props, RouteContext routeContext) {
        SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, props);
        return sqlRewriteEntry.rewrite(logicSQL.getSql(), logicSQL.getParameters(), logicSQL.getSqlStatementContext(), routeContext);
    }

SQLRewriteEntry#rewrite,调用RouteSQLRewriteEngine重写Sql。

    public SQLRewriteResult rewrite(final String sql, final List<Object> parameters, final SQLStatementContext<?> sqlStatementContext, final RouteContext routeContext) {
        SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, parameters, sqlStatementContext, routeContext);
        SQLTranslatorRule rule = database.getRuleMetaData().findSingleRule(SQLTranslatorRule.class).orElseGet(() -> new SQLTranslatorRule(new SQLTranslatorRuleConfiguration()));
        DatabaseType protocolType = database.getProtocolType();
        DatabaseType storageType = database.getResource().getDatabaseType();
        return routeContext.getRouteUnits().isEmpty()
                ? new GenericSQLRewriteEngine(rule, protocolType, storageType).rewrite(sqlRewriteContext)
                : new RouteSQLRewriteEngine(rule, protocolType, storageType).rewrite(sqlRewriteContext, routeContext);
    }

RouteSQLRewriteEngine#rewrite,重写sql。

    public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1);
        for (Entry<String, Collection<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {
            Collection<RouteUnit> routeUnits = entry.getValue();
            if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {
                sqlRewriteUnits.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
            } else {
                addSQLRewriteUnits(sqlRewriteUnits, sqlRewriteContext, routeContext, routeUnits);
            }
        }
        return new RouteSQLRewriteResult(translate(sqlRewriteContext.getSqlStatementContext().getSqlStatement(), sqlRewriteUnits));
    }

    private void addSQLRewriteUnits(final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits, final SQLRewriteContext sqlRewriteContext,
                                    final RouteContext routeContext, final Collection<RouteUnit> routeUnits) {
        for (RouteUnit each : routeUnits) {
            sqlRewriteUnits.put(each, new SQLRewriteUnit(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each)));
        }
    }

AbstractSQLBuilder#toSQL,重新生成SQL。

    @Override
    public final String toSQL() {
        if (context.getSqlTokens().isEmpty()) {
            return context.getSql();
        }
        Collections.sort(context.getSqlTokens());
        StringBuilder result = new StringBuilder();
        result.append(context.getSql(), 0, context.getSqlTokens().get(0).getStartIndex());
        for (SQLToken each : context.getSqlTokens()) {
            result.append(each instanceof ComposableSQLToken ? getComposableSQLTokenText((ComposableSQLToken) each) : getSQLTokenText(each));
            result.append(getConjunctionText(each));
        }
        return result.toString();
    }

归并

ShardingSpherePreparedStatement#getResultSet(),归并查询到的结果集。

    @Override
    public ResultSet getResultSet() throws SQLException {
        if (null != currentResultSet) {
            return currentResultSet;
        }
        if (trafficContext.isMatchTraffic()) {
            return executor.getTrafficExecutor().getResultSet();
        }
        if (executionContext.getRouteContext().isFederated()) {
            return executor.getFederationExecutor().getResultSet();
        }
        if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
            List<ResultSet> resultSets = getResultSets();
            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
            currentResultSet = new ShardingSphereResultSet(resultSets, mergedResult, this, executionContext);
        }
        return currentResultSet;
    }

MergeEngine#merge,执行merge。

    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
        Optional<MergedResult> mergedResult = executeMerge(queryResults, sqlStatementContext);
        Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
        return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
    }
    
    @SuppressWarnings({"unchecked", "rawtypes"})
    private Optional<MergedResult> executeMerge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
        for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultMergerEngine) {
                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(
                        database.getName(), database.getResource().getDatabaseType(), entry.getKey(), props, sqlStatementContext);
                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, database));
            }
        }
        return Optional.empty();
    }

ShardingDQLResultMerger#merge,根据group by、distinct、order by等关键字做不同的归并处理

    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext, final ShardingSphereDatabase database) throws SQLException {
        if (1 == queryResults.size() && !isNeedAggregateRewrite(sqlStatementContext)) {
            return new IteratorStreamMergedResult(queryResults);
        }
        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
        selectStatementContext.setIndexes(columnLabelIndexMap);
        MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, database);
        return decorate(queryResults, selectStatementContext, mergedResult);
    }

    private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                               final Map<String, Integer> columnLabelIndexMap, final ShardingSphereDatabase database) throws SQLException {
        String defaultSchemaName = DatabaseTypeEngine.getDefaultSchemaName(selectStatementContext.getDatabaseType(), database.getName());
        ShardingSphereSchema schema = selectStatementContext.getTablesContext().getSchemaName()
                .map(optional -> database.getSchemas().get(optional)).orElseGet(() -> database.getSchemas().get(defaultSchemaName));
        if (isNeedProcessGroupBy(selectStatementContext)) {
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schema);
        }
        if (isNeedProcessDistinctRow(selectStatementContext)) {
            setGroupByForDistinctRow(selectStatementContext);
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schema);
        }
        if (isNeedProcessOrderBy(selectStatementContext)) {
            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schema);
        }
        return new IteratorStreamMergedResult(queryResults);
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/575604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GPT学习笔记-Embedding的降维与2D,3D可视化

嵌入&#xff08;Embedding&#xff09;在机器学习和自然语言处理中是一种表示离散变量&#xff08;如单词、句子或整个文档&#xff09;的方式&#xff0c;通常是作为高维向量或者矩阵。嵌入的目标是捕捉到输入数据中的语义信息&#xff0c;使得语义相近的元素在嵌入空间中的距…

STM32+UART串口+DMA收发

目录 1、cubemax端配置 1.1 初始化配置 1.2 GPIO配置 1.3 UART配置 1.3.1 串口基础配置 1.3.2 DMA配置 2、keil端代码设计 2.1 初始化配置 2.2 DMA接收初始化配置 2.3 DMA发送配置 2.4 接收回调函数设置 2.5 回调函数内容代码编写 2.5.1 接收回调函数 2.5.2 发送回调…

最优化理论-最速下降法的推导与应用

目录 1. 引言 2. 最速下降法的基本原理 3. 最速下降法的推导过程 3.1 梯度和梯度下降 3.2 最速下降法的数学表述 4. 最速下降法的应用 4.1 无约束优化问题 4.2 约束优化问题 5. 最速下降法的优缺点 6. 结论 7.代码实现 1. 引言 在最优化理论中&#xff0c;最速下降法…

3W字吃透:微服务网关SpringCloud gateway底层原理和实操

40岁老架构师尼恩的掏心窝&#xff1a; 现在拿到offer超级难&#xff0c;甚至连面试电话&#xff0c;一个都搞不到。 尼恩的技术社群中&#xff08;50&#xff09;&#xff0c;很多小伙伴凭借 “左手云原生 右手大数据 SpringCloud Alibaba 微服务“三大绝活&#xff0c;拿…

Dock的安装和使用

1、docker基础 三大组件: 仓库、镜像、容器什么是docker: 通俗来讲就是提供服务的容器Docker 两个概念:容器:可以看做空间 例如:磁盘、文件夹 镜像:灵魂 例如:系统、应用 一个镜像可以放在多个容器中(就如同把同一个文件复制到多个磁盘或文件夹一样) 一个容器可以放多个镜…

【Nginx】实战应用(服务器端集群搭建、下载站点、用户认证模块)

文章目录 Nginx实现服务器端集群搭建Nginx与Tomcat部署环境准备(Tomcat)环境准备(Nginx) Nginx实现动静分离需求分析动静分离实现步骤 Nginx实现Tomcat集群搭建 Nginx高可用解决方案KeepalivedVRRP环境搭建Keepalived配置文件介绍访问测试keepalived之vrrp_script Nginx制作下载…

python中的常见运算符

文章目录 算数运算符赋值运算关系运算符逻辑运算符非布尔值的与或非运算条件运算符(也叫三元运算符)运算符的优先级 算数运算符 加法运算符&#xff08;如果两个字符串之间进行加法运算&#xff0c;则会进行拼串操作&#xff09; - 减法运算符 * 乘法运算符&#xff08;如果将字…

小鹏汽车Q1财报:押注G6、大力降本,明年智驾BOM降半

‍作者 | 德新编辑 | 王博 小鹏汽车本周发了Q1财报&#xff0c;数据不好看&#xff0c;以致于在微博端也发了公开信。 那后续呢&#xff1f; 小鹏第二季度指引是&#xff0c;总交付数量约为2.1 - 2.2万辆&#xff0c;收入预计约为45 - 47亿元&#xff1b;四季度&#xff0c…

Selective Kernel Networks论文总结和代码实现

论文&#xff1a;https://arxiv.org/abs/1903.06586?contextcs 中文版&#xff1a;(CVPR-2019)选择性的内核网络_sk卷积 源码&#xff1a;GitHub - implus/SKNet: Code for our CVPR 2019 paper: Selective Kernel Networks 目录 一、论文出发点 二、论文主要工作 三、SK模…

洛谷——树

洛谷——树 文章目录 洛谷——树树的重心会议题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示数据范围 思路 树的直径【XR-3】核心城市题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示思路 [NOI2003] 逃学的小孩题目描述输入格式输出格式样例 #1样例…

Cocos creator实现《滑雪趣挑战》滑雪小游戏资源及代码

Cocos creator实现《滑雪趣挑战》滑雪小游戏资源及代码 最近在学习Cocos Creator&#xff0c;作为新手&#xff0c;刚刚开始学习Cocos Creator&#xff0c;上线了两个微信小游戏&#xff0c;刚刚入门&#xff0c;这里记录一下《滑雪趣挑战》实现及上线过程的过程。 ](https://…

vue实现深拷贝的方法

在 vue中&#xff0c;深拷贝是一个很有用的功能&#xff0c;在不改变原来对象状态的情况下&#xff0c;进行对象的复制。 但要实现深拷贝&#xff0c;需要两个对象具有相同的属性。如果两个对象不同&#xff0c;深拷贝也不能实现。 1.我们将变量A的属性赋给变量B&#xff0c;但…

springboot+java医院门诊挂号系统设计与实现ssm008

本课题的目标是使医院门诊信息管理清晰化&#xff0c;透明化&#xff0c;便于操作&#xff0c;易于管理。通过功能模块的优化组合实现不同的管理细节&#xff0c;使管理过程实现最大程度的自动化与信息化,并能自动对人工操作环节进行复查,使医院门诊挂号系统出错率降至最低。 主…

3、mqtt客户端演示(MQTT通信协议(mosquitto)发布订阅 C语言实现)

可订阅可发布模式 具体代码 客户端1代码&#xff1a;pub.c #include <stdio.h> #include <stdlib.h> #include <mosquitto.h> #include <string.h>#define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_S…

ChatGPT提示词工程进阶教学

ChatGPT提示词工程 1 两种大型语言模型LLM1.1 基础大模型&#xff08;base LLM&#xff09;1.2 指令调优大模型(Instruction Tuned LLM) 2 如何更清晰、具体地书写提示词2.1 在提示词中使用“定界符”2.2 向模型请求结构化的输出2.3 要求模型检查任务条件是否满足2.4 输入多范例…

uCOSii中的互斥信号量

uCOSii中的互斥信号量 一、互斥型信号量项管理 (MUTUAL EXCLUSION SEMAPHORE MANAGEMENT) OSMutexAccept() 无条件等待地获取互斥型信号量 OSMutexCreate() 建立并初始化一个互斥型信号量 OSMutexDel() 删除互斥型信号量 OSMutexPend() 等待一个互斥型信号量 OSMutexPost…

扬帆起航——Qt自定义控件介绍

文章目录 前言自定义控件的定义自定义控件的好处如何实现自定义控件实现没有自带的控件 如何使用自定义控件测试和优化常见的自定义控件总结 前言 Qt 提供了丰富的控件、工具和库&#xff0c;可以帮助开发人员快速创建现代化的跨平台应用程序。但是对于某些特殊的需求&#xf…

【数据结构】冒泡,快速,直接插入,归并,选择排序

&#x1f38a;专栏【数据结构】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【Dream It Possible】 大一同学小吉&#xff0c;欢迎并且感谢大家指出我的问题&#x1f970; 目录 &#x1f381;冒泡排序 &#x1f3f3;️‍&…

CentOS7.4安装OpenVPN

系统环境 [rootvpn ~]# cat /etc/redhat-release CentOS Linux release 7.4.1708 (Core) 一. 准备工作 [rootvpn ~]# yum -y install openssl-devel openssl pam pam-devel lzo lzo-devel pkcs11-helper pkcs11-helper-devel 二. 安装OpenVPN服务 1. 下载openvpn源码包 [r…

【计算机网络 - 第六章】链路层

目录 一、概述 1、数据链路层提供的服务&#xff1f; 二、差错检测 1、奇偶校验 2、循环冗余校验CRC 三、多路访问链路和协议 1、概述 &#xff08;1&#xff09;多路访问协议 2、信道划分协议 ① 频分多路复用FDM ② 时分多路复用TDM ③ 波分多路复用WDM ④ 码分…