Seatunnel-2.3.0源码解析

news2025/1/11 18:10:21

一、概述

SeaTunnel是一个简单易用的数据集成框架,在企业中,由于开发时间或开发部门不通用,往往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行。数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享。SeaTunnel支持海量数据的实时同步。它每天可以稳定高效地同步数百亿数据。

SeaTunnel适用于以下场景

SeaTunnel的特点

  • 海量数据的同步
  • 海量数据的集成
  • 海量数据的ETL
  • 海量数据聚合
  • 多源数据处理
  • 基于配置的低代码开发,易用性高,方便维护。
  • 支持实时流式传输
  • 离线多源数据分析
  • 高性能、海量数据处理能力
  • 模块化的插件架构,易于扩展
  • 支持用SQL进行数据操作和数据聚合
  • 支持Spark structured streaming
  • 支持Spark 2.x

二、工作原理

       以官方案例为例, 通过使用bin/start-seatunnel-flink.sh脚本来提交Flink任务,脚本内容如下:

set -eu
# resolve links - $0 may be a softlink
PRG="$0"

while [ -h "$PRG" ] ; do
  # shellcheck disable=SC2006
  ls=`ls -ld "$PRG"`
  # shellcheck disable=SC2006
  link=`expr "$ls" : '.*-> \(.*\)$'`
  if expr "$link" : '/.*' > /dev/null; then
    PRG="$link"
  else
    # shellcheck disable=SC2006
    PRG=`dirname "$PRG"`/"$link"
  fi
done

PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/starter/seatunnel-flink-13-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
    . "${CONF_DIR}/seatunnel-env.sh"
fi

if [ $# == 0 ]
then
    args="-h"
else
    args=$@
fi

set +u
# Log4j2 Config
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
  JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-flink-starter"
fi

CLASS_PATH=${APP_DIR}/starter/logging/*:${APP_JAR}

CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 234 ]; then
    # print usage
    echo "${CMD}"
    exit 0
elif [ ${EXIT_CODE} -eq 0 ]; then
    echo "Execute SeaTunnel Flink Job: $(echo "${CMD}" | tail -n 1)"
    eval $(echo "${CMD}" | tail -n 1)
else
    echo "${CMD}"
    exit ${EXIT_CODE}
fi                                                                       

         其中,比较重要的两条命令为:

APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter" 

CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?

         SeaTunnel通过脚本去执行了seatunnel-core-flink.jar并且入口类为org.apache.seatunnel.core.flink.FlinkStarter,我们接下来移步源码来看这个FlinkStarter类。

public class FlinkStarter implements Starter {

    private static final String APP_NAME = SeatunnelFlink.class.getName();
    public static final String APP_JAR_NAME = "seatunnel-flink-starter.jar";
    /**
     * SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
     */
    private final FlinkCommandArgs flinkCommandArgs;
    /**
     * SeaTunnel flink job jar.
     */
    private final String appJar;

    FlinkStarter(String[] args) {
        this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
        // set the deployment mode, used to get the job jar path.
        Common.setDeployMode(flinkCommandArgs.getDeployMode());
        Common.setStarter(true);
        this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
    }

    @SuppressWarnings("checkstyle:RegexpSingleline")
    public static void main(String[] args) {
        FlinkStarter flinkStarter = new FlinkStarter(args);
        System.out.println(String.join(" ", flinkStarter.buildCommands()));
    }
    @Override
    public List<String> buildCommands() {
        List<String> command = new ArrayList<>();
        command.add("${FLINK_HOME}/bin/flink");
        command.add(flinkCommandArgs.getRunMode().getMode());
        command.addAll(flinkCommandArgs.getOriginalParameters());
        command.add("-c");
        command.add(APP_NAME);
        command.add(appJar);
        command.add("--config");
        command.add(flinkCommandArgs.getConfigFile());
        if (flinkCommandArgs.isCheckConfig()) {
            command.add("--check");
        }
        //set job name
        command.add("-Dpipeline.name=" + flinkCommandArgs.getJobName());
        // set System properties
        flinkCommandArgs.getVariables().stream()
                .filter(Objects::nonNull)
                .map(String::trim)
                .forEach(variable -> command.add("-D" + variable));
        return command;
    }
}

      在FlinkStarter类中,通过buildCommands()构建Flink提交任务命令。至此,可以大概推断出来SeaTunnel的执行逻辑,即将自己封装为一个Jar包提交Flink执行,在执行时,SeaTunnel根据用户编写的conf来装填对应的Source、Transform、Sink插件,最终将拼好任务后提交给Flink构建StreamGraph和JobGraph。

       FlinkStarter类中,将要执行的jar的main设置为SeatunnelFlink.class,该类主要完成flinkcommand构建和运行。

public class SeatunnelFlink {

    public static void main(String[] args) throws CommandException {
        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
        Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
            .buildCommand(flinkCommandArgs);
        Seatunnel.run(flinkCommand);
    }
}

        CommandLineUtils类中实现了所有实现AbstractCommandArgs接口的对象的参数解析

public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
        JCommander jCommander = JCommander.newBuilder()
                .programName(programName)
                .addObject(obj)
                .acceptUnknownOptions(acceptUnknownOptions)
                .build();
        try {
            jCommander.parse(args);
            // The args is not belongs to SeaTunnel, add into engine original parameters
            obj.setOriginalParameters(jCommander.getUnknownOptions());
        } catch (ParameterException e) {
            System.err.println(e.getLocalizedMessage());
            exit(jCommander);
        }

        if (obj.isHelp()) {
            exit(jCommander);
        }
        return obj;
    }

        Seatunnel.run方法主要负责提交command执行,而command作为接口,包含七个实现类,其中flink相关的,包含 FlinkApiConfValidateCommand (校验配置参数是否规范)和FlinkApiTaskExecuteCommand (真正构建执行任务) 

    public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
        try {
            command.execute();
        } catch (ConfigRuntimeException e) {
            showConfigError(e);
            throw e;
        } catch (Exception e) {
            showFatalError(e);
            throw e;
        }
    }

       FlinkApiConfValidateCommand类主要通过获取config配置信息,FlinkApiTaskExecuteCommand类主要负责任务执行。该类中通过构建FlinkExecution对象执行任务,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。 

public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {

    private final FlinkCommandArgs flinkCommandArgs;

    public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
        this.flinkCommandArgs = flinkCommandArgs;
    }

    @Override
    public void execute() throws CommandExecuteException {
        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
        checkConfigExist(configFile);
        Config config = new ConfigBuilder(configFile).getConfig();
        FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
        try {
            seaTunnelTaskExecution.execute();
        } catch (Exception e) {
            throw new CommandExecuteException("Flink job executed failed", e);
        }
    }
}

       FlinkApiConfValidateCommand类主要通过获取config配置信息,然后构建FlinkExecution对象,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。

private final FlinkEnvironment flinkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
private final List<URL> jarPaths;

public FlinkExecution(Config config) {
      try {
          jarPaths = new ArrayList<>(Collections.singletonList(
              new File(Common.appStarterDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
      } catch (MalformedURLException e) {
          throw new SeaTunnelException("load flink starter error.", e);
      }
      registerPlugin(config.getConfig("env"));
      JobContext jobContext = new JobContext();
      jobContext.setJobMode(FlinkEnvironmentFactory.getJobMode(config));

      this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
      this.transformPluginExecuteProcessor = new TransformExecuteProcessor(jarPaths,
          TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()), jobContext);
      this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths, config.getConfigList(Constants.SINK), jobContext);

      this.flinkEnvironment = new FlinkEnvironmentFactory(this.registerPlugin(config, jarPaths)).getEnvironment();

      this.sourcePluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
      this.transformPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
      this.sinkPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
}

        FlinkExecution对象构建完成后,执行execute方法,分别对sourcePluginExecuteProcessor、transformPluginExecuteProcessor、sinkPluginExecuteProcessor执行execute方法。其中,FlinkEnvironment和SparkEnvironment一样,是对Flink和Spark中执行上下文的封装。

@Override
public void execute() throws TaskExecuteException {
    List<DataStream<Row>> dataStreams = new ArrayList<>();
    dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
    dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
    sinkPluginExecuteProcessor.execute(dataStreams);

    log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
    try {
          flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
    } catch (Exception e) {
        throw new TaskExecuteException("Execute Flink job error", e);
    }
}

       sourcePluginExecuteProcessor中execute方法通过addsource获取带有Schema的DataStreamSource<Row>,最终返回DataStream<Row>的列表。

    @Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
        StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
        List<DataStream<Row>> sources = new ArrayList<>();
        for (int i = 0; i < plugins.size(); i++) {
            //插件方式,动态引入数据源
            SeaTunnelSource internalSource = plugins.get(i);
            BaseSeaTunnelSourceFunction sourceFunction;
            //检查数据源是否支持协同
            if (internalSource instanceof SupportCoordinate) {
                sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
            } else {
                sourceFunction = new SeaTunnelParallelSource(internalSource);
            }
            DataStreamSource<Row> sourceStream = addSource(executionEnvironment,
                sourceFunction,
                "SeaTunnel " + internalSource.getClass().getSimpleName(),
                internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
            Config pluginConfig = pluginConfigs.get(i);
            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                int parallelism = pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                sourceStream.setParallelism(parallelism);
            }
            // 将处理后的数据注册为表
            registerResultTable(pluginConfig, sourceStream);
            sources.add(sourceStream);
        }
        return sources;
    }

    private DataStreamSource<Row> addSource(
        final StreamExecutionEnvironment streamEnv,
        final BaseSeaTunnelSourceFunction function,
        final String sourceName,
        boolean bounded) {
        checkNotNull(function);
        checkNotNull(sourceName);
        checkNotNull(bounded);

        TypeInformation<Row> resolvedTypeInfo = function.getProducedType();
        boolean isParallel = function instanceof ParallelSourceFunction;
        streamEnv.clean(function);

        final StreamSource<Row, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(streamEnv, resolvedTypeInfo, sourceOperator, isParallel, sourceName, bounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED);
    }

       transformPluginExecuteProcessor中execute方法首先判断transform插件是否为空,为空则代表无transform操作,根据transform插件依次对数据进行转换,最后将结果写入result返回。

@Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
        if (plugins.isEmpty()) {
            return upstreamDataStreams;
        }
        DataStream<Row> input = upstreamDataStreams.get(0);
        List<DataStream<Row>> result = new ArrayList<>();
        for (int i = 0; i < plugins.size(); i++) {
            try {
                FlinkStreamTransform transform = plugins.get(i);
                Config pluginConfig = pluginConfigs.get(i);
                DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
                input = transform.processStream(flinkEnvironment, stream);
                registerResultTable(pluginConfig, input);
                transform.registerFunction(flinkEnvironment);
                result.add(input);
            } catch (Exception e) {
                throw new TaskExecuteException(
                    String.format("SeaTunnel transform task: %s execute error", plugins.get(i).getPluginName()), e);
            }
        }
        return result;
    }

       SinkExecuteProcessor中execute方法同理,最终通过stream.sinkTO将结果写入对于sink端,同时根据sink端配置信息,设置是否并行。 

@Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
        DataStream<Row> input = upstreamDataStreams.get(0);
        for (int i = 0; i < plugins.size(); i++) {
            Config sinkConfig = pluginConfigs.get(i);
            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
            seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
            if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                dataStreamSink.setParallelism(parallelism);
            }
        }
        // the sink is the last stream
        return null;
}

三、源码补充

Flink DataSource

       在sourcePluginExecuteProcessor的execute方法中,创建了一个名为sourcefunction的BaseSeaTunnelFunction对象,该类继承至抽象类RichSourceFunction,并实现了CheckpointListener、CheckpointedFunction、ResultTypeQueryable<Row>三个接口。其中RichSourceFunction抽象类又继承至抽象类AbstractRichFunction,并实现SourcFunction接口。AbstractRichFunction的父类是抽象类RichFunction,该类是用户自定义function的基类   。     

        sourcePluginExecuteProcessor的execute中通过addSource方法将一个StreamFunction封装为StreamSource,并最终返回一个DataStreamSource对象,DataStreamSource表示DataStream的起点,是SingleOutputStreamOperator的子类,超级父类为DataStream,所有的数据源都会被包装为DataStreamSource。通过addSource方法添加具有自定义类型信息的数据源到计算任务中。用户自定义数据源则通过实现SourceFunction接口,该接口供两个方法:

run执行方法,实现读取数据的实际操作
cancel取消函数,用于取消/关闭连接使用

       SourceFunction定义的数据为非并行的,实现ParallelSourceFunction接口或继承RichParallelSourceFunction来定义并行的source。SeaTunnel中SeaTunnelParallelSource实现ParallelSourceFunction接口提供并行数据源。

seatunnel Source

 seatunnel ExecuteProcessor

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/419963.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring学习笔记(二)【CGLIB浅拷贝BeanCopier的使用和详解】

CGLIB浅拷贝BeanCopier的使用和详解 一、bean拷贝工具 bean拷贝工具类比较 常用的bean拷贝工具类当中&#xff0c;主要有Apache提供的beanUtils、Spring提供的beanUtils、Cglib提供的beanCopier&#xff0c;性能上分析如下表所示&#xff08;该表来自网上的数据&#xff09; …

探索Apache Hudi核心概念 (3) - Compaction

Compaction是MOR表的一项核心机制&#xff0c;Hudi利用Compaction将MOR表产生的Log File合并到新的Base File中。本文我们会通过Notebook介绍并演示Compaction的运行机制&#xff0c;帮助您理解其工作原理和相关配置。 1. 运行 Notebook 本文使用的Notebook是&#xff1a;《A…

此战成硕,我成功上岸西南交通大学了~~~

友友们&#xff0c;好久不见&#xff0c;很长时间没有更一个正式点的文章了&#xff01; 是因为我在去年年底忙着准备初试&#xff0c;今年年初在准备复试&#xff0c;直到3月底拟录取后&#xff0c;终于可以写下这篇上岸贴&#xff0c;和大家分享一下考研至上岸的一个过程 文章…

springboot+thymeleaf实现发Html邮件自由

2019年&#xff0c;我刚接触测试架构和测试开发类的工作时&#xff0c;经常会有自动化发邮件的功能&#xff0c;大都是从各个平台自动化统计一些数据出来&#xff0c;每周定时发一封邮件给领导交差&#xff0c;回过头来再看看我发的邮件&#xff0c;不美观&#xff0c;不专业。…

Android Jetpack:现代化Android开发的利器

Android Jetpack&#xff1a;现代化Android开发的利器 引言 随着移动应用的快速发展和用户体验的不断提升&#xff0c;现代化的Android应用开发变得愈发复杂和多样化。为了提高开发效率、简化代码、加速应用迭代&#xff0c;Google推出了Android Jetpack组件&#xff0c;成为现…

springboot零基础到项目实战

推荐教程&#xff1a; springboot零基础到项目实战 SpringBoot这门技术课程所包含的技术点其实并不是很多&#xff0c;但是围绕着SpringBoot的周边知识&#xff0c;也就是SpringBoot整合其他技术&#xff0c;这样的知识量很大&#xff0c;例如SpringBoot整合MyBatis等等。因此…

gitlab-ce升级方法

Centos7升级gitlab-ce&#xff1a; 1、记录当前版本 在升级前一定要做好备份&#xff0c;记录自己当前的gitlab-ce的版本&#xff1a; yum list | grep gitlab-ce 2、编辑/etc/gitlab/gitlab.rb文件&#xff1a; 1&#xff09;将下面几行注释取消。 说明&#xff1a; 1&am…

pytorch进阶学习(二):使用DataLoader读取自己的数据集

上一节使用的是官方数据集fashionminist进行训练&#xff0c;这节课使用自己搜集的数据集来进行数据的获取和训练。所需资源教学视频&#xff1a;https://www.bilibili.com/video/BV1by4y1b7hX/?spm_id_from333.1007.top_right_bar_window_history.content.click&vd_sourc…

建立数据驱动,关键字驱动和混合Selenium框架这些你了解吗

一、什么是Selenium框架&#xff1f; Selenium框架是一种代码结构&#xff0c;用于简化代码维护和提高代码可读性。框架涉及将整个代码分成较小的代码段&#xff0c;以测试特定的功能。 该代码的结构使得“数据集”与实际的“测试用例”分开&#xff0c;后者将测试Web应用程序…

【PyTorch】第八节:数据的预处理

作者&#x1f575;️‍♂️&#xff1a;让机器理解语言か 专栏&#x1f387;&#xff1a;PyTorch 描述&#x1f3a8;&#xff1a;PyTorch 是一个基于 Torch 的 Python 开源机器学习库。 寄语&#x1f493;&#xff1a;&#x1f43e;没有白走的路&#xff0c;每一步都算数&#…

【NVIDIA GPU 入门】综述

系列文章目录 文章目录系列文章目录前言一、概述二、GPU架构基础2.1 GPU概述2.2 GPU的架构2.3 自主查询GPU相关信息三、CUDA编程概念3.1 CUDA线程模型3.1 线程层次结构1.引入库2.读入数据总结参考文献前言 GPU作为机器学习的基础运算设备&#xff0c;基本上是无人不知无人不晓。…

【bsauce读论文】PSPRAY-基于时序侧信道的Linux内核堆利用技术

会议&#xff1a;USENIX Security’23 作者&#xff1a;来自 Seoul National University 的 Yoochan Lee、Byoungyoung Lee 等人。 主要内容&#xff1a;由于Linux内核的堆分配器SLUB开启的freelist随机化保护&#xff0c;所以堆相关的内核漏洞利用成功率较低&#xff08;平均…

BEV(一)---lift splat shoot

1. 算法原理 1.1 2D坐标与3D坐标的关系 如图&#xff0c;已知世界坐标系上的某点P&#xff08;Xc&#xff0c; Yc&#xff0c; Zc&#xff09;经过相机的内参矩阵可以获得唯一的图像坐标p&#xff08;x&#xff0c; y&#xff09;&#xff0c;但是反过来已知图像上某点p&…

软考初级程序员--学习

1、十进制 转 二进制 1.1、整数十进制87 转换为 二进制为 1010111 1.2 、小数十进制0.125 转为 二进制 为 0.001 使用乘2取整法&#xff0c;一直乘到没有小数 2、二进制 转 十进制 2.1、二进制1010111 转换为 十进制 2.2、 二进制小数0.001 转 十进制 3、循环队列 计算长度通用…

周赛341(模拟、双指针、树上DP)

文章目录周赛341[6376. 一最多的行](https://leetcode.cn/problems/row-with-maximum-ones/)暴力模拟[6350. 找出可整除性得分最大的整数](https://leetcode.cn/problems/find-the-maximum-divisibility-score/)暴力模拟[6375. 构造有效字符串的最少插入数](https://leetcode.c…

JVM系统优化实践(15):GC可视化工具实践

您好&#xff0c;我是湘王&#xff0c;这是我的CSDN博客&#xff0c;欢迎您来&#xff0c;欢迎您再来&#xff5e; 线上系统的JVM监测要么使用jstat、jmap、jhat等工具查看JVM状态&#xff0c;或者使用监控系统&#xff0c;如Zabbix、Prometheus、Open-FaIcon、Ganglia等。作为…

pyg的NeighborLoader和LinkNeighborLoader

NeighborLoader 1 数据格式要求 需要传入加载的属性值&#xff1a; class NeighborLoader(data: Union[Data, HeteroData, Tuple[FeatureStore, GraphStore]], num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]], input_nodes: Union[Tensor, None…

进程调度的基本过程

进程调度的基本过程&#x1f50e; 进程是什么&#x1f50e; 进程管理&#x1f50e; 进程中结构体的属性进程标识符(PID)内存指针文件描述符表结构体中与进程调度相关的属性进程的状态进程的优先级进程的上下文进程的记账信息&#x1f50e; 总结&#x1f50e; 结尾&#x1f50e;…

(第十四届蓝桥真题) 整数删除(线段树+二分)

样例输入&#xff1a; 5 3 1 4 2 8 7 样例输出&#xff1a; 17 分析&#xff1a;这道题我想的比较复杂&#xff0c;不过复杂度还是够用的&#xff0c;我是用线段树二分来做的。 我们用线段树维护所有位置的最小值&#xff0c;那么我们每次删除一个数之前先求一遍最小值&a…

停车场管理系统文件录入(C++版)

❤️作者主页&#xff1a;微凉秋意 ✅作者简介&#xff1a;后端领域优质创作者&#x1f3c6;&#xff0c;CSDN内容合伙人&#x1f3c6;&#xff0c;阿里云专家博主&#x1f3c6; 文章目录一、案例需求描述1.1、汽车信息模块1.2、普通用户模块1.3、管理员用户模块二、案例分析三…