【Flink】详解JobGraph

news2024/11/14 3:06:34

概述

JobGraph 是 StreamGraph 优化后的产物,客户端会将优化后的 JobGraph 发送给 JM。接下来的文章涉及到一些前置知识点,没有看前几期的小伙伴最好看一下前几期:

  1. 【Flink】详解StreamGraph
  2. 【Flink】浅谈Flink架构和调度
  3. 【Flink】详解Flink的八种分区

Flink 在客户端将 StreamGraph 对象转换成 JobGraph 对象,这个转换的核心在于将多个符合条件的 StreamNode 节点合并在一起,形成一个 JobVertex 节点,这样的优化方式称之为算子链合并,这样做可以有效减少数据在节点间传递所需的序列化、反序列化操作。同一个算子链中的算子运行在同一个 TaskSlot 中,也可由理解为运行在一个线程中,这样可以显著降低线程切换的性能开销,并且能增大吞吐量和降低延迟。

源码分析

JobGraph 的构建

JobGraph 的相关代码主要在【flink-runtime】模块下的 org.apache.flink.runtime.JobGraph 中。其调用链路是 StreamGraph#getJobGraphStreamingJobGraphGenerator#createJobGraph()

/*------------------------ StreamGraph ---------------------------*/
// 构造入口
public JobGraph getJobGraph() {  
    return getJobGraph(null);  
}  
   
public JobGraph getJobGraph(@Nullable JobID jobID) {  
    return StreamingJobGraphGenerator.createJobGraph(this, jobID);  
}
/*---------------------------------------------------------*/

/*--------------- StreamingJobGraphGenerator ------------------*/

public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {  
    return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();  
}

private JobGraph createJobGraph() {  
	// 前置校验
    preValidate();  
    // 获取StreamGraph的调度模式
    // 设置JobGraph的调度模式
    jobGraph.setJobType(streamGraph.getJobType());  

	// jobGraph设置是否启动本地近似恢复策略
    jobGraph.enableApproximateLocalRecovery( streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
    // 为每一个StreamNode生成一个确定的哈希值
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);  
  
    // 为兼容问题生成哈希值
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());  
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {  
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));  
    }  

	// 这里是重点,JobGraph的顶点和边在这个方法中创建。
	// 尝试将尽可能多的StreamNode聚合在一个JobGraph节点中。
	// 判断算子chain,合并创建JobVertex,并生成JobEdge。
    setChaining(hashes, legacyHashes);  
    // 设置物理边界
    setPhysicalEdges();  
    // 设置jobGraph的SlotSharingGroup和CoLocationGroup
    setSlotSharingAndCoLocation();  
	
    setManagedMemoryFraction(  
            Collections.unmodifiableMap(jobVertices),  
            Collections.unmodifiableMap(vertexConfigs),  
            Collections.unmodifiableMap(chainedConfigs),  
            id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),  
            id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());  


    // 设置jobGraph的各个 JobVertex 的checkpoint 信息
    // 比如说source JobVertex 需要trigger checkpoint
    // 所有的JobVertex需要commit和ack checkpoint
    configureCheckpointing();
    
    // 设置保存点配置
    jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());  
  
    final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =  
            JobGraphUtils.prepareUserArtifactEntries(  
                    streamGraph.getUserArtifacts().stream()  
                            .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),  
                    jobGraph.getJobID());  

	
    for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :  
            distributedCacheEntries.entrySet()) {  
        jobGraph.addUserArtifact(entry.getKey(), entry.getValue());  
    }  
  
    // 设置运行时配置信息
    try {  
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());  
    } catch (IOException e) {  
        throw new IllegalConfigurationException(  
                "Could not serialize the ExecutionConfig."  
                        + "This indicates that non-serializable types (like custom serializers) were registered");  
    }  
	// 返回JobGraph对象
    return jobGraph;  
}

/*---------------------------------------------------------*/

// 定义Flink-Job调度枚举类型
public enum JobType {  
	// 批处理模式
    BATCH,  
    // 流处理模式
	STREAMING  
}

从上面的分析可以看出,由 StreamGraph 到 JobGraph 最重要的一步是创建算子链 setChaining(hashes, legacyHashes),这样做可以尽可能的多整合一些操作在同一个节点中完成,避免不必要的线程切换和网络通信。举一个简单一点的例子,DataStream.map(a -> a+1).filter(a -> a > 2),此时数据流有两个处理步骤,也就是两个算子组成,即 mapfilter,这两个算子会组成不同的 StreamNode 对象和 Task 对象,如果这两个 Task 不在一个 TaskSlot 或者一个 TM 中,那么必然涉及到网络传输,这样的执行性能会很差,为了优化这一点,Flink 引入了算子链的概念,一个算子链代表一组可以在同一个 TaskSlot 中执行的算子串。

/*--------------- StreamingJobGraphGenerator ------------------*/
// 从StreamNode递归创建JobVertex对象
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {  

	final Map<Integer, OperatorChainInfo> chainEntryPoints =  
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);  
    
    final Collection<OperatorChainInfo> initialEntryPoints =  
            chainEntryPoints.entrySet().stream()  
                    .sorted(Comparator.comparing(Map.Entry::getKey))  
                    .map(Map.Entry::getValue)  
                    .collect(Collectors.toList());  
  
    // 创建算子链
    for (OperatorChainInfo info : initialEntryPoints) {  
        createChain(  
                info.getStartNodeId(),  
                1, // 索引从1开始,0是Source
                info,  
                chainEntryPoints);  
    }  
}


// 创建算子链
private List<StreamEdge> createChain(
            final Integer currentNodeId,
            final int chainIndex,
            final OperatorChainInfo chainInfo,
            final Map<Integer, OperatorChainInfo> chainEntryPoints) {

		// 获取起始Node-ID
        Integer startNodeId = chainInfo.getStartNodeId();
        // builtVertices用于存放已经进行构建的StreamNode ID,避免重复构造
        if (!builtVertices.contains(startNodeId)) {
			// transitiveOutEdges 存储整个算子链的出边
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
			// chainableOutputs 存储所有可以形成算子链的StreamEdge
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // nonChainableOutputs 存储不可以形成算子链的StreamEdge
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
			// 获取当前处理的SteamNode
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

			// 对所有的StreamEdge进行处理,分为可以形成算子链和不可以形成算子链两类
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }

			// 如果是可以形成算子链的StreamEdge对象,递归调用createChain,并添加到transitiveOutEdges
			// 递归结束条件:
			// 1. 当前节点不再有出边;
			// 2. 当前节点已经完成转换
            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(
                        createChain(
                                chainable.getTargetId(),
                                chainIndex + 1,
                                chainInfo,
                                chainEntryPoints));
            }

			// 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                createChain(
                        nonChainable.getTargetId(),
                        1, // operators start at position 1 because 0 is for chained source inputs
                        chainEntryPoints.computeIfAbsent(
                                nonChainable.getTargetId(),
                                (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                        chainEntryPoints);
            }
            
			// 设置算子链名称
            chainedNames.put(
                    currentNodeId,
                    createChainedName(
                            currentNodeId,
                            chainableOutputs,
                            Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
            // 设置算子链所需最小资源
            chainedMinResources.put(
                    currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            // 设置算子链所需最佳资源
            chainedPreferredResources.put(
                    currentNodeId,
                    createChainedPreferredResources(currentNodeId, chainableOutputs));

			// 
            OperatorID currentOperatorId =
                    chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));

            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId)
                        .addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }

            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId)
                        .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }

			// 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
            StreamConfig config =
                    currentNodeId.equals(startNodeId)
                            ? createJobVertex(startNodeId, chainInfo)
                            : new StreamConfig(new Configuration());

			// 设置的顶点属性到config中
            setVertexConfig(
                    currentNodeId,
                    config,
                    chainableOutputs,
                    nonChainableOutputs,
                    chainInfo.getChainedSources());

            if (currentNodeId.equals(startNodeId)) {
				// 开始一个新的算子链的连接
                config.setChainStart();
                config.setChainIndex(chainIndex);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
				// 对于每一个算子链,把它和指向下一个算子链的出边连接起来
                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }
				// 
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {
                chainedConfigs.computeIfAbsent(
                        startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(currentOperatorId);

            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;

        } else {
            return new ArrayList<>();
        }
    }

// 判断是否可以形成算子链
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}

private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {  
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);  
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);  
  
    if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)  
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)  
            && (edge.getPartitioner() instanceof ForwardPartitioner)  
            && edge.getShuffleMode() != ShuffleMode.BATCH  
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()  
            && streamGraph.isChainingEnabled())) {  
  
        return false;  
    }  
  
    for (StreamEdge inEdge : downStreamVertex.getInEdges()) {  
        if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {  
            return false;  
        }  
    }  
    return true;  
}

public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {  
    return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)  
            || (slotSharingGroup != null  
                    && slotSharingGroup.equals(downstreamVertex.slotSharingGroup));  
}

private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(  
        final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {  
  
    final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();  
    final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();  

	// 遍历所有的Source-StreamNode
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {  
	    // 根据ID获取StreamNode对象
        final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);  

        if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory  
                && sourceNode.getOutEdges().size() == 1) {  
            final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);  
            final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());  
            final ChainingStrategy targetChainingStrategy =  
                    target.getOperatorFactory().getChainingStrategy();  
  
            if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES  
                    && isChainableInput(sourceOutEdge, streamGraph)) {  
                final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));  
                final StreamConfig.SourceInputConfig inputConfig =  
                        new StreamConfig.SourceInputConfig(sourceOutEdge);  
                final StreamConfig operatorConfig = new StreamConfig(new Configuration());  
                setVertexConfig(  
                        sourceNodeId,  
                        operatorConfig,  
                        Collections.emptyList(),  
                        Collections.emptyList(),  
                        Collections.emptyMap());  
                operatorConfig.setChainIndex(0); // sources are always first  
                operatorConfig.setOperatorID(opId);  
                operatorConfig.setOperatorName(sourceNode.getOperatorName());  
                chainedSources.put(  
                        sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));  
  
                final SourceOperatorFactory<?> sourceOpFact =  
                        (SourceOperatorFactory<?>) sourceNode.getOperatorFactory();  
                final OperatorCoordinator.Provider coord =  
                        sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);  
  
                final OperatorChainInfo chainInfo =  
                        chainEntryPoints.computeIfAbsent(  
                                sourceOutEdge.getTargetId(),  
                                (k) ->  
                                        new OperatorChainInfo(  
                                                sourceOutEdge.getTargetId(),  
                                                hashes,  
                                                legacyHashes,  
                                                chainedSources,  
                                                streamGraph));  
                chainInfo.addCoordinatorProvider(coord);  
                continue;  
            }  
        }  

		// 将SourceID-OperatorChainInfo添加到HashMap中
        chainEntryPoints.put(  
                sourceNodeId,  
                new OperatorChainInfo(  
                        sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));  
    }  
  
    return chainEntryPoints;  
}


/*---------------------------------------------------------*/

/*--------------- ChainingStrategy ------------------*/
public enum ChainingStrategy {  

	// 最大程度连接前后算子
    ALWAYS,  
    // 算子不会连接前后的算子形成算子链
    NEVER,  
    // 算子只会连接后面的算子但是不会连接前面的算子
    HEAD,  
    // 头部算子,尽可能连接多个source算子
    HEAD_WITH_SOURCES;  
	// 默认连接策略是【ALWAYS】
    public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;  
}

/*--------------------------------------------------*/

简单总结一下 JobGraph 的构建过程,入口方法是 setChaining(),该方法会构造一个 Collection<OperatorChainInfo> 对象,该对象是所有 Source 节点的信息集合,遍历该集合调用 createChain() 方法,该方法会递归调用下游节点,构建算子链。在改方法中会对每一个 Operator 调用 isChainable 方法,将所有的出边分成两类:chainalbeOutputs 和 noChainableOutputs,递归遍历二者进行算子链的构建,同时将 StreamNode 的配置信息序列化到 StreamConfig 对象中,这里会有一个分支,如果当前节点是算子链的头结点,则会调用 createJobVertex 构建 JobVertex 对象和 JobEdge 对象相连;如果当前节点不是算子链的头节点,则构建一个新的 StreamConfig 对象。

能够形成算子链的依据是 isChainable 方法和 isChainableInput 方法,具体判断条件如下:

  1. 下游节点的前置节点只有一个;
  2. 分区器必须是ForwardPartitioner
  3. Shuffle 模式必须是 Pipeline 模式;
  4. 上下游的并行度必须一致;
  5. StreamGraph 启用算子链优化;
  6. 上游算子的算子链策略必须是【ALWAYS | HEAD | HEAD_WITH_SOURCES】;
  7. 下游算子的算子链策略必须是【ALWAYS | 上游是 Source 算子的情况下 HEAD_WITH_SOURCES】;
  8. 上、下游算子都分配了 SlotSharingGroup 而且二者一致;

createJobVertex 方法中,首先创建一个 JobVertex 对象,然后调用 jobVertex.setInvokableClass() 设置执行类,然后设置运行资源和并行度。最后传递 JobVertex 对象的配置信息构建一个 StreamConfig 对象并返回。

遍历 transitiveOutEdges,调用 connect() 方法,在 connect 方法中,依据 StreamEdge 对象得到上下游 JobVertex 节点信息;通过 StreamEdge.getPartitioner() 方法得到 StreamPartitioner 属性,如果分区器的 isPointwise() 方法返回 True(ForwardPartitioner 和 RescalePartitioner 分区器都是由明确指向的),那么构建 DistributionPattern.POINTWISE 类型的 JobEdge 对象,其余的分区器构建 DistributionPattern.ALL_TO_ALL 类型的 JobEdge 对象,JobEdge 对象就是各个 JobVertex 之间的连接对象,也就是说在 connect 方法中就完成了 JobGraph 的各个节点之间的连接工作。

// connect方法
private void connect(Integer headOfChain, StreamEdge edge) {  
  
    physicalEdgesInOrder.add(edge);  
  
    Integer downStreamVertexID = edge.getTargetId();  

	// 获取算子链头JobVertex对象
    JobVertex headVertex = jobVertices.get(headOfChain);  
    // 获取下游JobVertex对象
    JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);  
  
    StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());  
  
    downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);  

	// 获取分区器
    StreamPartitioner<?> partitioner = edge.getPartitioner();  

	// 获取结果分区类型
    ResultPartitionType resultPartitionType;  
    switch (edge.getShuffleMode()) {  
        case PIPELINED:  
            resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;  
            break;  
        case BATCH:  
            resultPartitionType = ResultPartitionType.BLOCKING;  
            break;  
        case UNDEFINED:  
            resultPartitionType = determineResultPartitionType(partitioner);  
            break;  
        default:  
            throw new UnsupportedOperationException(  
                    "Data exchange mode " + edge.getShuffleMode() + " is not supported yet.");  
    }  
  
    checkAndResetBufferTimeout(resultPartitionType, edge);  

	// 依据分区器的不同构建JobEdge对象
    JobEdge jobEdge;  
    if (partitioner.isPointwise()) {  
        jobEdge =  
                downStreamVertex.connectNewDataSetAsInput(  
                        headVertex, DistributionPattern.POINTWISE, resultPartitionType);  
    } else {  
        jobEdge =  
                downStreamVertex.connectNewDataSetAsInput(  
                        headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);  
    }  
    // 设置策略名称,这些都可以在Web上看到 
    jobEdge.setShipStrategyName(partitioner.toString());  
    jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());  
    jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());  

	// 打印日志【分区器名称】-【算子链头】->【下游VertexID】
    if (LOG.isDebugEnabled()) {  
        LOG.debug(  
                "CONNECTED: {} - {} -> {}",  
                partitioner.getClass().getSimpleName(),  
                headOfChain,  
                downStreamVertexID);  
    }  
}

物理边界的设置

setPhysicalEdges 方法用于设置 JobVertex 的物理边界,执行方法总结如下:

  1. 遍历 physicalEdgesInOrder 对象,该对象包含所有不能构成算子链的边,将边的目标节点的入边添加到一个 List 对象中;
  2. 遍历所有的 physicalInEdgesInOrder,经过上面的步骤,该对象的内部结构为不能构成算子链的边的下游节点 ID-入边集合,将该节点的入边结合都设置为实际物理边界。
// 设置物理边界 
private void setPhysicalEdges() {  
    Map<Integer, List<StreamEdge>> physicalInEdgesInOrder =  
            new HashMap<Integer, List<StreamEdge>>();  
  
    for (StreamEdge edge : physicalEdgesInOrder) {  
        int target = edge.getTargetId();  
  
        List<StreamEdge> inEdges =  
                physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList<>());  
  
        inEdges.add(edge);  
    }  
  
    for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {  
        int vertex = inEdges.getKey();  
        List<StreamEdge> edgeList = inEdges.getValue();  
  
        vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);  
    }  
}

最终完成物理边界划分的 JobGraph 如下图所示:

在这里插入图片描述

JobGraph 的提交

JobGraph 的相关行为由ClusterClient接口进行定义,该接口定义了封装程序提交到远程集群的方法,关键源码分析如下:

public interface ClusterClient<T> extends AutoCloseable {
	// 返回集群ID
	T getClusterId();
	// 返回Flink设置
	Configuration getFlinkConfiguration();
	// 关闭客户端此时连接的集群
	void shutDownCluster();
	// 获取所有的正在运行和完成的作业
	CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
	// 提交JobGraph给集群
	CompletableFuture<JobID> submitJob(JobGraph jobGraph);
	// 根据作业ID获取其运行状态
	CompletableFuture<JobResult> requestJobResult(JobID jobId);
	// 根据JobId触发保存点
	CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
	// 向协调器发出请求并接收回应
	CompletableFuture<CoordinationResponse> sendCoordinationRequest(  
        JobID jobId, OperatorID operatorId, CoordinationRequest request);
    // 根据JobID停止相关作业(仅适用于流式作业)
    // 发送停止指令后本质上只是Source停止发送数据,整个程序停止还需要所有的TM处理完当前数据
    // jobId:作业唯一标识符
    // advanceToEndOfEventTime:表示Source是否注入最大水位线
    CompletableFuture<String> stopWithSavepoint(  
        final JobID jobId,  
        final boolean advanceToEndOfEventTime,  
        @Nullable final String savepointDirectory);
    // 根据JobID获取Job结果
    CompletableFuture<JobResult> requestJobResult(JobID jobId);
    // 根据JobID获取累加器
    // jobId:作业唯一标识符
    // loader:用于反序列化
    CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
	// 根据JobID撤销作业
	CompletableFuture<Acknowledge> cancel(JobID jobId);
	...
}

也就是说 JobGraph 对象最终会由 ClusterClient#submitJob() 发送给集群,由 JM 的 JobMaster 进行接收,之后的调用链路是 JobMaster#startJobExecution()JobMaster#startScheduling() 开始进行任务调动。

往期回顾

  1. 【Flink】详解StreamGraph
  2. 【Flink】浅谈Flink架构和调度
  3. 【Flink】详解Flink的八种分区
  4. 【Flink】浅谈Flink背压问题(1)
  5. 【分布式】浅谈CAP、BASE理论(1)

文中难免会出现一些描述不当之处(尽管我已反复检查多次),欢迎在留言区指正,相关的知识点也可进行分享,希望大家都能有所收获!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/341137.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Flutter入门到进阶】Dart进阶篇---进阶用法

1 Dart对象扩展 1.1 extension 1.1.1 介绍 可以在不更改类或创建子类的情况下&#xff0c;向类添加扩展功能的一种方式。灵活使用 extension 对基础类进行扩展&#xff0c;对开发效率有显著提升。 1.1.2 需求 在开发项目中碰到需求&#xff1a;将单位为分的数值转换成单位为…

RabbitMQ(黑马spring cloud笔记)

MQ 目录MQ一、同步通讯和异步通讯1. 同步通讯2. 异步通讯二、RabbitMQ1. 部署2. 架构3. 常见消息模型3.1 基本消息队列&#xff08;Basic Queue&#xff09;3.2 工作消息队列&#xff08;Work Queue&#xff09;3.3 发布订阅&#xff08;Publish、Subscribe&#xff09;4. 消息…

TPAMI 2022 | RC-Explainer:图神经网络的强化因果解释器

文章目录 一、论文关键信息二、基础概念三、主要内容1. Motivations2. Insights3. 解决方案的关键四、总结与讨论CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 一、论文关键信息 论文标题:Reinforced Causal Explainer for Graph Neural Networks 期刊信息:IEEE Transact…

【C++】内存管理

&#x1f345;不同的数据放在不同的地方&#xff0c;需要内存管理 目录 ☃️1.C/C中的内存分布 ☃️2.C语言中动态内存管理方式 ☃️3.C内存管理方式 &#x1f41d;3.1 new/delete操作内置类型 &#x1f41d;3.2 new和delete操作自定义类型 &#x1f41d;3.3 operator n…

FISCO BCOS节点扩容和使用console进行群组扩容

一、安装并启动FISCO BCOS 搭建单机单群组4节点的教程查看&#xff1a;https://blog.csdn.net/yueyue763184/article/details/128924144?spm1001.2014.3001.5501 二、下载扩容脚本 在fisco目录下输入以下命令&#xff1a; curl -#LO https://raw.githubusercontent.com/FI…

155、【动态规划】leetcode ——474. 一和零:三维数组+二维滚动数组(C++版本)

题目描述 原题链接&#xff1a;474. 一和零 解题思路 &#xff08;1&#xff09;三维数组 本题是要在已有的字符串中&#xff0c;找到给定的m个0和n个1&#xff0c;组出最大的子集。将字符串集合中的各个字符串看作物品&#xff0c;m个0和n个1看作背包的重量&#xff0c;则该…

jenkins +docker+python接口自动化之jenkins容器安装python3(二)

jenkins dockerpython接口自动化之jenkins容器安装python3&#xff08;二&#xff09; 目录&#xff1a;导读 前提是在docker下已经配置好jenkins容器了&#xff0c;是将python安装在jenkins容器下的 1、先看你的jenkins是否安装好 2、以root权限进入jenkins容器&#xff1…

NLP方向的论文可投的核心期刊

目录1、《计算机仿真》北大核心、科技核心2、《通信学报》北大核心、科技核心、CSCD核心3、《计算机科学》北大核心、EI来源期刊、CSCD核心4、《计算机工程》北大核心、科技核心5、《计算机应用》北大核心、科技核心、CSCD核心6、《计算机工程与应用》北大核心、科技核心、CSCD…

Python - 数据容器dict(字典)

目录 字典的定义 字典数据的获取 字典的嵌套 字典的各种操作 新增与更新元素 [Key] Value 删除元素 pop和del 清空字典 clear 获取全部的键 keys 遍历字典 容器通用功能总览 字典的定义 使用{}&#xff0c;不过存储的元素是一个个的&#xff1a;键值对&#…

golang的web框架Gin(一)---Gin的Resutful风格

Restful风格是什么&#xff1f; REST与技术无关&#xff0c;代表的是一种软件架构风格&#xff0c;REST是Representational State Transfer的简称&#xff0c;中文翻译为“表征状态转移”或“表现层状态转化”。 RESTFUL特点包括&#xff1a; 每一个URI代表1种资源&#xff…

STM32F103C8T6—库函数应用I2C/SPI驱动OLED显示中文、字符串

文章目录1. I2C与SPI通信协议对比2. 四脚OLED与六脚OLED3. I2C驱动OLED显示oled.h & oled.c&#xff1a;汉字取模 & oledfont.h&#xff1a;main.c 显示示例&#xff1a;连线方法&#xff1a;4. SPI驱动OLED显示1. I2C与SPI通信协议对比 I2C&#xff08;Inter-Integra…

基于springboot的毕业设计管理系统

摘要随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代&…

【开源】祁启云网络验证系统V1.11

简介 祁启云免费验证系统 一个使用golang语言、Web框架beego、前端Naive-Ui-Admin开发的免费网络验证系统 版本 当前版本1.11 更新方法 请直接将本目录中的verification.exe/verification直接覆盖到你服务器部署的目录&#xff0c;更新前&#xff0c;请先关闭正在运行的验…

搭建云端vscode-server,使用web ide进行远程开发

使用乌班图系统&#xff0c;搭建自己的网页vs code开发环境github地址&#xff1a;GitHub - coder/code-server: VS Code in the browser安装脚本curl -fsSL https://code-server.dev/install.sh | sh出现deb package has been installed.表示已经正确安装。测试启动2.1修改配置…

Idea打包springboot项目war包,测试通过

pom.xml文件 <!--包名以及版本号&#xff0c;这个是打包时候使用&#xff0c;版本可写可不写&#xff0c;建议写有利于维护系统--> <artifactId>tsgdemo</artifactId> <version>0.0.1-SNAPSHOT</version> <!--打包形式--> <packaging&…

在项目中遇到的关于form表单的问题

前言 以下内容都是基于element Plus 和 vue3 一个form-item校验两个下拉框 有时候不可避免会遇到需要一个form-item校验两个下拉框的情况&#xff0c;比如&#xff1a; 这种情况下传统的校验已经无法实现&#xff0c;需要通过form表单提供的自定义校验来实现。以上面的必填…

6年软件测试经验,从我自己的角度理解自动化测试

接触了不少同行&#xff0c;由于他们之前一直做手工测试&#xff0c;现在很迫切希望做自动化测试&#xff0c;其中不乏工作5年以上的人。 本人从事软件自动化测试已经近6年&#xff0c;从server端到web端&#xff0c;从API到mobile&#xff0c;切身体会到自动化带来的好处与痛楚…

Ansible自动化运维工具---安装及命令模块

目录 引言 一、Ansible概述 1.1、Ansible 自动运维工具特点 1.2、Ansible 运维工具原理 1.3、Ansible系统架构 1.4、Ansible的特性 二、安装Ansible 三、Ansible命令模块 command模块 shell模块 cron模块 user模块 group模块 copy模块 file模块 hostname 模块 p…

[acwing周赛复盘] 第 90 场周赛20230211 补

[acwing周赛复盘] 第 90 场周赛20230211 补 一、本周周赛总结二、 4806. 首字母大写1. 题目描述2. 思路分析3. 代码实现三、4807. 找数字1. 题目描述2. 思路分析3. 代码实现四、4808. 构造字符串1. 题目描述2. 思路分析3. 代码实现六、参考链接一、本周周赛总结 T1 模拟T2 模拟…

ThingsBoard-实现定时任务调度器批量RPC

1、概述 ThingsBoard-CE版是不支持调度器的,只有PE版才支持,但是系统中很多时候需要使用调度器来实现功能,例如:定时给设备下发rpc查询数据,我们如何来实现呢?下面我将教你使用巧妙的方法来实现。 2、使用什么实现 我们可以使用规则链提供的一个节点来实现,这个节点可…