Hudi Flink SQL源码调试学习(1)

news2025/1/11 10:49:12

前言

本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对应源码片段。

版本

  • Flink 1.15.4
  • Hudi 0.13.0

目标

在文章Hudi Flink SQL代码示例及本地调试中提到:我们发现Table API的入口和DataStream API的入口差不多,DataStream API的入口是在HoodiePipelinesinksource方法里,而这两个方法也是分别调用了HoodieTableFactorycreateDynamicTableSinkcreateDynamicTableSource。那么Table API的代码怎么一步一步走到createDynamicTableSinkcreateDynamicTableSource的呢?返回HoodieTableSink之后又是怎么写数据的?因为我发现Hudi写数据的主要逻辑入口好像是在HoodieTableSink.getSinkRuntimeProvider的方法体里,这些问题之前都没有搞清楚,所以这次的目标就是要搞清楚:1、Table API 的入口到createDynamicTableSink返回HoodieTableSink的主要代码步骤; 2、在哪里调用HoodieTableSink.getSinkRuntimeProvider的方法体进行后面的写Hudi逻辑的

相关类:

  • HoodiePipeline (DataStream API)
  • HoodieTableFactory
  • HoodieTableSink
  • DataStreamSinkProviderAdapter (函数式接口)
  • TableEnvironmentImpl
  • BatchPlanner
  • PlannerBase
  • FactoryUtil
  • BatchExecSink
  • CommonExecSink

DataStream API

其实上面的问题在DataStream API代码里很容易看出来,我们先看一下DataStream API写Hudi的代码,详细代码在文章:Flink Hudi DataStream API代码示例

DataStream<RowData> dataStream = env.fromElements(
        GenericRowData.of(1, StringData.fromString("hudi1"), 1.1, 1000L, StringData.fromString("2023-04-07")),
        GenericRowData.of(2, StringData.fromString("hudi2"), 2.2, 2000L, StringData.fromString("2023-04-08"))
);

HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
        .column("id int")
        .column("name string")
        .column("price double")
        .column("ts bigint")
        .column("dt string")
        .pk("id")
        .partition("dt")
        .options(options);

builder.sink(dataStream, false);

HoodiePipeline.Builder.sink

    public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
      TableDescriptor tableDescriptor = getTableDescriptor();
      return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
    }

HoodiePipeline.sink

  private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
    FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
    HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
    return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
        .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
        .consumeDataStream(input);
  }

HoodiePipeline.sink就可以找到答案:
1、HoodieTableFactory.createDynamicTableSink 返回HoodieTableSink
2、HoodieTableSink.getSinkRuntimeProvider 返回DataStreamSinkProviderAdapter
3、DataStreamSinkProviderAdapter.consumeDataStream调用HoodieTableSink.getSinkRuntimeProvider中的方法体执行后面的写Hudi逻辑。这里的dataStream为我们最开始在程序里创建的DataStream<RowData>

HoodieTableSink.getSinkRuntimeProvider

getSinkRuntimeProvider返回DataStreamSinkProviderAdapter,其中Lambda 表达式dataStream -> {}DataStreamSinkProviderAdapter.consumeDataStream(dataStream)的具体实现

  @Override
  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    return (DataStreamSinkProviderAdapter) dataStream -> {

      // setup configuration
      long ckpTimeout = dataStream.getExecutionEnvironment()
          .getCheckpointConfig().getCheckpointTimeout();
      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
      // set up default parallelism
      OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());

      RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

      // bulk_insert mode
      final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
      if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
        return Pipelines.bulkInsert(conf, rowType, dataStream);
      }

      // Append mode
      if (OptionsResolver.isAppendMode(conf)) {
        DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
        if (OptionsResolver.needsAsyncClustering(conf)) {
          return Pipelines.cluster(conf, rowType, pipeline);
        } else {
          return Pipelines.dummySink(pipeline);
        }
      }

      DataStream<Object> pipeline;
      // bootstrap
      final DataStream<HoodieRecord> hoodieRecordDataStream =
          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
      // write pipeline
      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
      // compaction
      if (OptionsResolver.needsAsyncCompaction(conf)) {
        // use synchronous compaction for bounded source.
        if (context.isBounded()) {
          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        }
        return Pipelines.compact(conf, pipeline);
      } else {
        return Pipelines.clean(conf, pipeline);
      }
    };
  }

DataStreamSinkProviderAdapter其实是一个函数式接口,它是一种只包含一个抽象方法的接口。Lambda 表达式可以被赋值给一个函数式接口,从而实现接口的实例化

public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
  DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);

  @Override
  default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
    return consumeDataStream(dataStream);
  }
}

函数式接口和Lambda 表达式参考下面两篇文章:
https://it.sohu.com/a/682888110_100123073
https://blog.csdn.net/Speechless_/article/details/123746047

Table API

知道了 DataStream API 调用步骤后,来对比看一下 Table API 的大致调用步骤,调试代码入口。

tableEnv.executeSql(String.format("insert into %s values (1,'hudi',10,100,'2023-05-28')", tableName));

整体调用流程

1、tableEnv.executeSql->TableEnvironmentImpl.executeSql->executeInternal(Operation operation)->executeInternal(List<ModifyOperation> operations)->this.translate->(PlannerBase)this.planner.translate

2.1、PlannerBase.translate->PlannerBase.translateToRel->getTableSink(catalogSink.getContextResolvedTable, dynamicOptions)->FactoryUtil.createDynamicTableSink->HoodieTableFactory.createDynamicTableSink

2.2、PlannerBase.translate->(BatchPlanner)translateToPlan(execGraph)->(ExecNodeBase)node.translateToPlan->(BatchExecSink)translateToPlanInternal->(CommonExecSink)createSinkTransformation->(HoodieTableSink)getSinkRuntimeProvider->(CommonExecSink)applySinkProvider->provider.consumeDataStream

具体代码

TableEnvironmentImpl

(TableEnvironmentImpl)executeSql

    public TableResult executeSql(String statement) {
        List<Operation> operations = this.getParser().parse(statement);
        if (operations.size() != 1) {
            throw new TableException("Unsupported SQL query! executeSql() only accepts a single SQL statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.");
        } else {
            // 关键步骤:executeInternal
            return this.executeInternal((Operation)operations.get(0));
        }
    }

executeInternal(Operation operation)

    public TableResultInternal executeInternal(Operation operation) {
        if (operation instanceof ModifyOperation) {
            // 关键步骤:executeInternal
            return this.executeInternal(Collections.singletonList((ModifyOperation)operation));
        } else if (operation instanceof StatementSetOperation) {
            return this.executeInternal(((StatementSetOperation)operation).getOperations());

executeInternal(List<ModifyOperation> operations)

    public TableResultInternal executeInternal(List<ModifyOperation> operations) {
        // 关键步骤:translate
        List<Transformation<?>> transformations = this.translate(operations);
        List<String> sinkIdentifierNames = this.extractSinkIdentifierNames(operations);
        TableResultInternal result = this.executeInternal(transformations, sinkIdentifierNames);
        if ((Boolean)this.tableConfig.get(TableConfigOptions.TABLE_DML_SYNC)) {
            try {
                result.await();
            } catch (ExecutionException | InterruptedException var6) {
                result.getJobClient().ifPresent(JobClient::cancel);
                throw new TableException("Fail to wait execution finish.", var6);
            }
        }

        return result;
    }

translate
这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()

    protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
        // 这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()
        // 关键步骤:PlannerBase.translate
        return this.planner.translate(modifyOperations);
    }

BatchPlanner

(BatchPlanner的父类)PlannerBase.translate

  override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
    beforeTranslation()
    if (modifyOperations.isEmpty) {
      return List.empty[Transformation[_]]
    }
    // 关键步骤:translateToRel
    val relNodes = modifyOperations.map(translateToRel)
    val optimizedRelNodes = optimize(relNodes)
    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
    // 关键步骤:translateToPlan
    val transformations = translateToPlan(execGraph)
    afterTranslation()
    transformations
  }

PlannerBase.translateToRel

  private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
    val dataTypeFactory = catalogManager.getDataTypeFactory
    modifyOperation match {
      case s: UnregisteredSinkModifyOperation[_] =>
        val input = getRelBuilder.queryOperation(s.getChild).build()
        val sinkSchema = s.getSink.getTableSchema
        // validate query schema and sink schema, and apply cast if possible
        val query = validateSchemaAndApplyImplicitCast(
          input,
          catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),
          null,
          dataTypeFactory,
          getTypeFactory)
        LogicalLegacySink.create(
          query,
          s.getSink,
          "UnregisteredSink",
          ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))

      case collectModifyOperation: CollectModifyOperation =>
        val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
        DynamicSinkUtils.convertCollectToRel(
          getRelBuilder,
          input,
          collectModifyOperation,
          getTableConfig,
          getFlinkContext.getClassLoader
        )

      case catalogSink: SinkModifyOperation =>
        val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
        val dynamicOptions = catalogSink.getDynamicOptions
        // 关键步骤:getTableSink
        getTableSink(catalogSink.getContextResolvedTable, dynamicOptions).map {
          case (table, sink: TableSink[_]) =>
            // Legacy tables can't be anonymous
            val identifier = catalogSink.getContextResolvedTable.getIdentifier
            // check the logical field type and physical field type are compatible
            val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
            // validate logical schema and physical schema are compatible
            validateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)
            // validate TableSink
            validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
            // validate query schema and sink schema, and apply cast if possible
            val query = validateSchemaAndApplyImplicitCast(
              input,
              table.getResolvedSchema,
              identifier.asSummaryString,
              dataTypeFactory,
              getTypeFactory)
            val hints = new util.ArrayList[RelHint]
            if (!dynamicOptions.isEmpty) {
              hints.add(RelHint.builder("OPTIONS").hintOptions(dynamicOptions).build)
            }
            LogicalLegacySink.create(
              query,
              hints,
              sink,
              identifier.toString,
              table,
              catalogSink.getStaticPartitions.toMap)

          case (table, sink: DynamicTableSink) =>
            DynamicSinkUtils.convertSinkToRel(getRelBuilder, input, catalogSink, sink)
        } match {
          case Some(sinkRel) => sinkRel
          case None =>
            throw new TableException(
              s"Sink '${catalogSink.getContextResolvedTable}' does not exists")
        }

PlannerBase.getTableSink

  private def getTableSink(
      contextResolvedTable: ContextResolvedTable,
      dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] = {
    contextResolvedTable.getTable[CatalogBaseTable] match {
      case connectorTable: ConnectorCatalogTable[_, _] =>
        val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
        toScala(connectorTable.getTableSink) match {
          case Some(sink) => Some(resolvedTable, sink)
          case None => None
        }

      case regularTable: CatalogTable =>
        val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
        ...

        if (
          !contextResolvedTable.isAnonymous &&
          TableFactoryUtil.isLegacyConnectorOptions(
            catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),
            tableConfig,
            isStreamingMode,
            objectIdentifier,
            resolvedTable.getOrigin,
            isTemporary
          )
        ) {
         ...
        } else {
          ...
          // 关键步骤:FactoryUtil.createDynamicTableSink
          val tableSink = FactoryUtil.createDynamicTableSink(
            factory,
            objectIdentifier,
            tableToFind,
            Collections.emptyMap(),
            getTableConfig,
            getFlinkContext.getClassLoader,
            isTemporary)
          Option(resolvedTable, tableSink)
        }

      case _ => None
    }

FactoryUtil.createDynamicTableSink

根据’connector’=‘hudi’ 找到factory为org.apache.hudi.table.HoodieTableFactory,接着调用HoodieTableFactory.createDynamicTableSink

    public static DynamicTableSink createDynamicTableSink(
            @Nullable DynamicTableSinkFactory preferredFactory,
            ObjectIdentifier objectIdentifier,
            ResolvedCatalogTable catalogTable,
            Map<String, String> enrichmentOptions,
            ReadableConfig configuration,
            ClassLoader classLoader,
            boolean isTemporary) {
        final DefaultDynamicTableContext context =
                new DefaultDynamicTableContext(
                        objectIdentifier,
                        catalogTable,
                        enrichmentOptions,
                        configuration,
                        classLoader,
                        isTemporary);

        try {
            // 'connector'='hudi' 
            // org.apache.hudi.table.HoodieTableFactory
            final DynamicTableSinkFactory factory =
                    preferredFactory != null
                            ? preferredFactory
                            : discoverTableFactory(DynamicTableSinkFactory.class, context);
            // 关键步骤:HoodieTableFactory.createDynamicTableSink
            return factory.createDynamicTableSink(context);
        } catch (Throwable t) {
            throw new ValidationException(
                    String.format(
                            "Unable to create a sink for writing table '%s'.\n\n"
                                    + "Table options are:\n\n"
                                    + "%s",
                            objectIdentifier.asSummaryString(),
                            catalogTable.getOptions().entrySet().stream()
                                    .map(e -> stringifyOption(e.getKey(), e.getValue()))
                                    .sorted()
                                    .collect(Collectors.joining("\n"))),
                    t);
        }
    }

HoodieTableFactory.createDynamicTableSink

第一个问题解决

  public DynamicTableSink createDynamicTableSink(Context context) {
    Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
    checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
        "Option [path] should not be empty.");
    setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
    sanityCheck(conf, schema);
    setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
    // 关键步骤:HoodieTableSink
    return new HoodieTableSink(conf, schema);
  }

BatchExecSink

回到方法PlannerBase.translate,它会在后面调用translateToPlanexecGraph.getRootNodes返回的内容为BatchExecSink (想知道为啥是BatchExecSink,可以看PlannerBase.translate中调用的translateToExecNodeGraph方法),
BatchExecSinkBatchExecNode的子类,所以会执行node.translateToPlan

PlannerBase.translateToPlan

  override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
    beforeTranslation()
    val planner = createDummyPlanner()

    val transformations = execGraph.getRootNodes.map {
      // BatchExecSink
      // 关键步骤:ExecNodeBase.translateToPlan
      case node: BatchExecNode[_] => node.translateToPlan(planner)
      case _ =>
        throw new TableException(
          "Cannot generate BoundedStream due to an invalid logical plan. " +
            "This is a bug and should not happen. Please file an issue.")
    }
    afterTranslation()
    transformations
  }

BatchExecSink

public class BatchExecSink extends CommonExecSink implements BatchExecNode<Object> {
    ...
public abstract class CommonExecSink extends ExecNodeBase<Object>
        implements MultipleTransformationTranslator<Object> {
    ...
    

ExecNodeBase.translateToPlan

    public final Transformation<T> translateToPlan(Planner planner) {
        if (transformation == null) {
            transformation =
                    // 关键步骤:BatchExecSink.translateToPlanInternal
                    translateToPlanInternal(
                            (PlannerBase) planner,
                            ExecNodeConfig.of(
                                    ((PlannerBase) planner).getTableConfig(),
                                    persistedConfig,
                                    isCompiled));
            if (this instanceof SingleTransformationTranslator) {
                if (inputsContainSingleton()) {
                    transformation.setParallelism(1);
                    transformation.setMaxParallelism(1);
                }
            }
        }
        return transformation;
    }

BatchExecSink.translateToPlanInternal

    protected Transformation<Object> translateToPlanInternal(
            PlannerBase planner, ExecNodeConfig config) {
        final Transformation<RowData> inputTransform =
                (Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner);
        // org.apache.hudi.table.HoodieTableSink        
        final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext());
        // 关键步骤:CommonExecSink.createSinkTransformation
        return createSinkTransformation(
                planner.getExecEnv(), config, inputTransform, tableSink, -1, false);
    }

CommonExecSink.createSinkTransformation

这里的tableSink为HoodieTableSink,会调用HoodieTableSink的getSinkRuntimeProvider方法返回runtimeProvider(没有执行里面的方法体)


    protected Transformation<Object> createSinkTransformation(
            StreamExecutionEnvironment streamExecEnv,
            ExecNodeConfig config,
            Transformation<RowData> inputTransform,
            // 这里的tableSink为HoodieTableSink
            DynamicTableSink tableSink,
            int rowtimeFieldIndex,
            boolean upsertMaterialize) {
        final ResolvedSchema schema = tableSinkSpec.getContextResolvedTable().getResolvedSchema();
        final SinkRuntimeProvider runtimeProvider =
               // 关键步骤:HoodieTableSink.getSinkRuntimeProvider
                tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));
        final RowType physicalRowType = getPhysicalRowType(schema);
        final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema);
        final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);
        final int inputParallelism = inputTransform.getParallelism();
        final boolean inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT);
        final boolean hasPk = primaryKeys.length > 0;

        ...

        return (Transformation<Object>)
                // 关键步骤:CommonExecSink.applySinkProvider
                applySinkProvider(
                        sinkTransform,
                        streamExecEnv,
                        runtimeProvider,
                        rowtimeFieldIndex,
                        sinkParallelism,
                        config);
    }


CommonExecSink.applySinkProvider

先通过new DataStream<>(env, sinkTransformation)生成dataStream,接着通过执行provider.consumeDataStream调用HoodieTableSink.getSinkRuntimeProvider中的方法体,这里的provider为HoodieTableSink.getSinkRuntimeProvider返回的DataStreamSinkProviderAdapter

    private Transformation<?> applySinkProvider(
            Transformation<RowData> inputTransform,
            StreamExecutionEnvironment env,
            SinkRuntimeProvider runtimeProvider,
            int rowtimeFieldIndex,
            int sinkParallelism,
            ExecNodeConfig config) {
        TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config);
        if (runtimeProvider instanceof DataStreamSinkProvider) {
            Transformation<RowData> sinkTransformation =
                    applyRowtimeTransformation(
                            inputTransform, rowtimeFieldIndex, sinkParallelism, config);
            // 生成dataStream
            final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
            final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
            // 关键步骤:provider.consumeDataStream
            return provider.consumeDataStream(createProviderContext(config), dataStream)
                    .getTransformation();
        } else if (runtimeProvider instanceof TransformationSinkProvider) {
            ...

provider.consumeDataStream(已经在上面的类DataStreamSinkProviderAdapter提过)

它会调用HoodieTableSink.getSinkRuntimeProvider中的方法体(Lambda 表达式)执行后面的写hudi逻辑
第二个问题解决

  default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
    return consumeDataStream(dataStream);
  }

总结

本文主要简单记录了自己调试 Hudi Flink SQL 源码的过程,并没有对源码进行深入的分析(自己水平也不够)。主要目的是为了弄清楚从Table API 的入口到createDynamicTableSink返回HoodieTableSink的主要代码步骤以及在哪里调用HoodieTableSink.getSinkRuntimeProvider的方法体以进行后面的写Hudi逻辑,这样便于后面对Hudi源码的分析和学习。

本文新学习知识点:函数式接口以及对应的 Lambda 表达式的实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/816468.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式事务之本地事务

&#x1f680; 分布式事务 &#x1f680; &#x1f332; AI工具、AI绘图、AI专栏 &#x1f340; &#x1f332; 如果你想学到最前沿、最火爆的技术&#xff0c;赶快加入吧✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;CSDN-Java领域优质创作者&#x1f3c6;&…

大数据课程E1——Flume的概述

文章作者邮箱&#xff1a;yugongshiyesina.cn 地址&#xff1a;广东惠州 ▲ 本章节目的 ⚪ 了解Ganglia的概念&#xff1b; ⚪ 了解Ganglia的拓扑结构和执行流程&#xff1b; ⚪ 掌握Ganglia的安装操作&#xff1b; 一、简介 1. 概述 1. Flume原本是由Cloude…

国内外遥感数据处理软件对比

1.国内遥感数据处理软件概况 1.1北京航天宏图信息技术股份有限公司 1.1.1公司简介 航天宏图信息技术股份有限公司成立于2008年,是国内遥感和北斗导航卫星应用服务商,致力于卫星应用软件国产化、行业应用产业化、应用服务商业化,研发并掌握了具有完全自主知识产权的PIE(Pix…

C# 2的幂

231 2的幂 给你一个整数 n&#xff0c;请你判断该整数是否是 2 的幂次方。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 如果存在一个整数 x 使得 n 2x &#xff0c;则认为 n 是 2 的幂次方。 示例 1&#xff1a; 输入&#xff1a;n 1 输出&a…

【RabbitMQ】之消息的可靠性方案

目录 一、数据丢失场景二、数据可靠性方案 1、生产者丢失消息解决方案2、MQ 队列丢失消息解决方案3、消费者丢失消息解决方案 一、数据丢失场景 MQ 消息数据完整的链路为&#xff1a;从 Producer 发送消息到 RabbitMQ 服务器中&#xff0c;再由 Broker 服务的 Exchange 根据…

平面设计软件都有哪些?推荐这7款

优秀的平面广告设计可以给产品带来良好的效益&#xff0c;正确传播品牌的价值和色调&#xff0c;而功能强大、使用方便的平面广告设计软件是创造优秀平面广告设计的关键。本文推荐7款备受好评的平面广告设计软件&#xff0c;易于使用&#xff01; 1.即时设计 即时设计是国内一…

HCIP OSPF+BGP综合实验

题目 1、该拓扑为公司网络&#xff0c;其中包括公司总部、公司分部以及公司骨干网&#xff0c;不包含运营商公网部分。 2、设备名称均使用拓扑上名称改名&#xff0c;并且区分大小写。 3、整张拓扑均使用私网地址进行配置。 4、整张网络中&#xff0c;运行OSPF协议或者BGP协议…

jmeter压力测试指标解释

目录 RT(response time) Throughput 吞吐量 并发用户数 QPS (query per seconds) TPS (transition per seconds) PV和UV 聚合报告&#xff1a; RT(response time) 什么是RT? RT就是指系统在接收到请求和做出相应这段时间跨度 但是值得一提的是RT的值越高,并不真的就能…

性能测试必备监控技能linux篇

前言 如果性能测试的目标服务器是linux系统&#xff0c;在如何使用linux自带的命令来实现性能测试过程的监控分析呢&#xff1f; 对于日常性能测试来讲&#xff0c;在linux下或是类Unix系统&#xff0c;我们必须掌握以下常用的指标查看命令。 ps pstree top free vmstat …

Springboot配置文件数据项加密

1 添加依赖 <!-- jasypt-spring-boot-starter --> <dependency><groupId>com.github.ulisesbocchio</groupId><artifactId>jasypt-spring-boot-starter</artifactId><version>3.0.2</version> </dependency> 2 加密数…

华为数通HCIA-网络参考模型(TCP/IP)

网络通信模式 作用&#xff1a;指导网络设备的通信&#xff1b; OSI七层模型&#xff1a; 7.应用层&#xff1a;由应用层协议&#xff08;http、FTP、Telnet.&#xff09;为应用程序产生对应的数据&#xff1b; 6.表示层&#xff1a;将应用层产生的数据转换成网络设备看得懂…

CloudDriver一款将各种网盘云盘挂在到电脑本地变成本地磁盘的工具 教程

平时我们的电脑可能由于大量的文件资料之类的导致存储空间可能不够&#xff0c;所以我们可以选择将网盘我们的本地磁盘用来存放东西。 CloudDrive 是一款可以将 115、阿里云盘、天翼云盘、沃家云盘、WebDAV 挂载到电脑中&#xff0c;成为本地硬盘的工具&#xff0c;支持 Window…

Day10-作业(SpringBootWeb案例)

作业1&#xff1a;完成课上预留给大家自己完成的功能 【部门管理的修改功能】 注意&#xff1a; 部门管理的修改功能&#xff0c;需要开发两个接口&#xff1a; 先开发根据ID查询部门信息的接口&#xff0c;该接口用户查询数据并展示 。(一定一定先做这个功能) 再开发根据ID…

543. 二叉树的直径

题目 题解一 遍历每一个节点&#xff0c;以每一个节点为中心点计算最长路径&#xff08;左子树边长右子树边长&#xff09;&#xff0c;更新全局变量max。 class Solution {int maxd0;public int diameterOfBinaryTree(TreeNode root) {depth(root);return maxd;}public int …

苍穹外卖--公共字段自动填充

问题分析 代码冗余&#xff0c;且不便于后期维护。 实现思路 技术点&#xff1a;枚举&#xff0c;注解&#xff0c;AOP&#xff0c;反射 1.自定义注解AutoFill 1.在sky-server.com.sky包下建立annotation&#xff08;注解&#xff09;包 2.在该包下建立注解AutoFill package co…

软件测试员,面试常见的18个问题(记得收藏!)

软件测试面试18个常见问题汇总 Q1&#xff1a;项目中相关需求问题&#xff0c;测试可以直接和客户沟通吗&#xff1f; A1&#xff1a;可以&#xff0c;最初与客户沟通需求时&#xff0c;测试人员直接参与&#xff0c;所以我们可以直接和客户方的代表开会进行沟通。 A2&#xff…

Apipost教程?一篇文章玩转Apipost

你是否经常遇到接口开发过程中的各种问题&#xff1f;或许你曾为接口测试与调试的繁琐流程而烦恼。不要担心&#xff01;今天我将向大家介绍一款功能强大、易于上手的接口测试工具——Apipost&#xff0c;并带你深入了解如何玩转它&#xff0c;轻松实现接口测试与调试。 什么是…

SQL-每日一题【1173. 即时食物配送 I】

题目 配送表: Delivery 如果顾客期望的配送日期和下单日期相同&#xff0c;则该订单称为 「即时订单」&#xff0c;否则称为「计划订单」。 查询即时订单所占的百分比&#xff0c; 保留两位小数。 查询结果如下所示。 示例 1: 解题思路 1.题目要求我们查询出顾客期望的配送日…

大数据开发面试必问:Hive调优技巧系列一

Hive必问调优 Hive 调优拆解:Hive SQL 几乎是每一位互联网分析师的必备技能&#xff0c;相信很多小伙伴都有被面试官问到 Hive 优化问题的经历。所以掌握扎实的 HQL 基础尤为重要&#xff0c;hive优化也是小伙伴应该掌握的一项技能&#xff0c;本篇文章具体从hive建表优化、HQ…

我求求你不要再问我SpringBoot的自动配置原理了!

目录 四、SpringBoot自动配置原理 4.1 起步依赖原理 4.1.1 版本锁定 4.1.2 起步依赖 4.2 自动配置原理 4.2.1 SpringBootApplication源码 4.2.2 SpringBootConfiguration源码 4.2.3 EnableAutoConfiguration源码 1&#xff09;Registrar 2&#xff09;AutoConfigurat…