Flink启动任务

news2025/3/26 15:21:55

Flink

以本地运行作为解读,版本1.16.0


文章目录

  • Flink
  • 前言
  • StreamExecutionEnvironment
  • LocalExecutor
  • MiniCluster
    • 启动MiniCluster
  • TaskManager
  • TaskExecutor
    • 提交Task(submitTask)
  • StreamGraph
  • 二、使用步骤
    • 1.引入库
    • 2.读入数据
  • 总结


前言

提示:这里可以添加本文要记录的大概内容:

例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。


提示:以下是本篇文章正文内容,下面案例可供参考

StreamExecutionEnvironment

流执行环境:本地使用:LocalStreamEnvironment,远程使用:RemoteStreamEnvironment。

1.1 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.1.1 获取所有算子,只能包含一个sink(输出)类型的算子。
1.1.2 调用方法getStreamGraph()将算子转换为流图(StreamGraph)。
1.1.3 调用execute(StreamGraph streamGraph)。

public JobExecutionResult execute(String jobName) throws Exception {
    final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
    StreamGraph streamGraph = getStreamGraph();
    if (jobName != null) {
        streamGraph.setJobName(jobName);
    }
    try {
        return execute(streamGraph);
    } catch (Throwable t) {
        // Retry without cache if it is caused by corrupted cluster dataset.
        invalidateCacheTransformations(originalTransformations);
        streamGraph = getStreamGraph(originalTransformations);
        return execute(streamGraph);
    }
}

1.2 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.2.1 通过executeAsync(StreamGraph streamGraph),方法异步执行流图。
1.2.2 根据返回的JobClient,用户控制作业的执行。

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    final JobClient jobClient = executeAsync(streamGraph);
    try {
        final JobExecutionResult jobExecutionResult;
        if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
            jobExecutionResult = jobClient.getJobExecutionResult().get();
        } else {
            jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
        }
        jobListeners.forEach(
                jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

        return jobExecutionResult;
    } catch (Throwable t) {
        Throwable strippedException = ExceptionUtils.stripExecutionException(t);
        jobListeners.forEach(
                jobListener -> {
                    jobListener.onJobExecuted(null, strippedException);
                });
        ExceptionUtils.rethrowException(strippedException);
        return null;
    }
}

1.3 通过StreamExecutionEnvironment调用executeAsync(StreamGraph streamGraph)方法。
1.3.1 通过getPipelineExecutor()方法获取PipelineExecutor为LocalExecutor。
1.3.2 LocalExecutor根据提供的工作流图,并执行。获取JobClient,允许与正在执行的作业进行交互。例如:取消作业或获取保存点。
1.3.3 JobListener监听JobClient。

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    checkNotNull(streamGraph, "StreamGraph cannot be null.");
    final PipelineExecutor executor = getPipelineExecutor();

    CompletableFuture<JobClient> jobClientFuture =
            executor.execute(streamGraph, configuration, userClassloader);

    try {
        JobClient jobClient = jobClientFuture.get();
        jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
        collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
        collectIterators.clear();
        return jobClient;
    } catch (ExecutionException executionException) {
        final Throwable strippedException =
                ExceptionUtils.stripExecutionException(executionException);
        jobListeners.forEach(
                jobListener -> jobListener.onJobSubmitted(null, strippedException));

        throw new FlinkException(
                String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                strippedException);
    }
}

LocalExecutor


实现了PipelineExecutor,负责执行StreamGraph,即用户提交的作业。

  1. 将StreamGraph通过getJobGraph(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)生成为JobGraph。
  2. 通过submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader)方法创建一个MiniCluster并提交一个任务(Job)。
public CompletableFuture<JobClient> execute(
       Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
       throws Exception {
   checkNotNull(pipeline);
   checkNotNull(configuration);

   Configuration effectiveConfig = new Configuration();
   effectiveConfig.addAll(this.configuration);
   effectiveConfig.addAll(configuration);

   // we only support attached execution with the local executor.
   checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

   final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);

   return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
           .submitJob(jobGraph, userCodeClassloader);
}

MiniCluster


本地执行Job任务

启动MiniCluster


1.1 创建workingDirectory,WorkingDirectory:该类管理进程或实例的工作目录,当已经被实例化后,该类确保指定的工作目录已存在。
1.2 创建metricRegistry,MetricRegistry:该类跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接。
1.3 创建commonRpcService,RpcService:rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程。
1.4 taskManagerRpcServiceFactory = commonRpcService
1.5 创建metricQueryServiceRpcService,RpcService
1.6 将metricQueryServiceRpcService设置到metricRegistry的用于初始化MetricQueryService。
1.7 创建processMetricGroup,ProcessMetricGroup,并设置metricRegistry。
1.8 创建ioExecutor,ExecutorService,线程池用于任务执行。
1.9 创建haServicesFactory,HighAvailabilityServicesFactory,创建高可用服务工厂。
1.20 创建haServicesFactory,通过高可用工厂创建高可用服务,并将当前注入到ioExecutor。HighAvailabilityServices:高可用服务可以访问所有支持高可用的组件(服务),这些服务提供了高可用的存储和服务注册,以及分布式计算和领导者选举。

ResourceManager  领导者选举并获取领导者信息
JobManager 领导者选举并获取领导者信息
Persistence 用户检查点元数据
Registering 最新完成的检查点
Persistence BLOB(二进制大对象)存储
Registry  标记作业状态
Naming RPC站点

1.21 创建blobServer并启动,BlobServer:BLOB服务器负责监听传入的请求,并生成线程来处理这些请求。此外,它还负责创建目录结构来存储BLOB或临时缓存它们。
1.22 创建heartbeatServices,HeartbeatServices:提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送器。
1.23 创建delegationTokenManager,传入了commonRpcService.getScheduledExecutor()和ioExecutor。
DelegationTokenManager:Flink集群代理所有的Token管理器。代理Token启动后,此管理器将确保长时间运行的应用程序在访问安全服务时可以不中断地运行。它必须联系所有配置的安全服务,以获取要分发给应用程序其余部分的委托令牌。
1.24 创建blobCacheService传入workingDirectory.getBlobStorageDirectory(),haServices.createBlobStore(),InetSocketAddress的地址。
BlobCacheService:存储为永久或临时BLOB,并提供对BLOB服务的访问
1.25 startTaskManagers()启动TaskManager服务
1.26 resourceManagerLeaderRetriever,从高可用服务中获取资源管理领导者(选举和获取领导者),在后续启动
1.27 dispatcherLeaderRetriever,从高可用服务中获取调度领导者(选举和获取领导者),在后续启动
1.28 clusterRestEndpointLeaderRetrievalService,从高可用服务中获取集群空闲站点领导者服务(选举和获取领导者),在后续启动

public void start() throws Exception {
    synchronized (lock) {
        checkState(!running, "MiniCluster is already running");

        LOG.info("Starting Flink Mini Cluster");
        LOG.debug("Using configuration {}", miniClusterConfiguration);

        final Configuration configuration = miniClusterConfiguration.getConfiguration();
        final boolean useSingleRpcService =
                miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

        try {
            //管理进程或实例的工作目录
            workingDirectory =
                    WorkingDirectory.create(
                            ClusterEntrypointUtils.generateWorkingDirectoryFile(
                                    configuration,
                                    Optional.of(PROCESS_WORKING_DIR_BASE),
                                    "minicluster_" + ResourceID.generate()));

            initializeIOFormatClasses(configuration);

            rpcSystem = rpcSystemSupplier.get();

            LOG.info("Starting Metrics Registry");
            //跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接
            metricRegistry =
                    createMetricRegistry(
                            configuration,
                            rpcSystem.deref().getMaximumMessageSizeInBytes(configuration));

            // bring up all the RPC services
            LOG.info("Starting RPC Service(s)");

            final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;
            final RpcService metricQueryServiceRpcService;

            if (useSingleRpcService) {
                // we always need the 'commonRpcService' for auxiliary calls
                //rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程
                commonRpcService = createLocalRpcService(configuration, rpcSystem.deref());
                final CommonRpcServiceFactory commonRpcServiceFactory =
                        new CommonRpcServiceFactory(commonRpcService);
                taskManagerRpcServiceFactory = commonRpcServiceFactory;
                dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;
                metricQueryServiceRpcService =
                        MetricUtils.startLocalMetricsRpcService(
                                configuration, rpcSystem.deref());
            } else {

                // start a new service per component, possibly with custom bind addresses
                final String jobManagerExternalAddress =
                        miniClusterConfiguration.getJobManagerExternalAddress();
                final String taskManagerExternalAddress =
                        miniClusterConfiguration.getTaskManagerExternalAddress();
                final String jobManagerExternalPortRange =
                        miniClusterConfiguration.getJobManagerExternalPortRange();
                final String taskManagerExternalPortRange =
                        miniClusterConfiguration.getTaskManagerExternalPortRange();
                final String jobManagerBindAddress =
                        miniClusterConfiguration.getJobManagerBindAddress();
                final String taskManagerBindAddress =
                        miniClusterConfiguration.getTaskManagerBindAddress();

                dispatcherResourceManagerComponentRpcServiceFactory =
                        new DedicatedRpcServiceFactory(
                                configuration,
                                jobManagerExternalAddress,
                                jobManagerExternalPortRange,
                                jobManagerBindAddress,
                                rpcSystem.deref());
                taskManagerRpcServiceFactory =
                        new DedicatedRpcServiceFactory(
                                configuration,
                                taskManagerExternalAddress,
                                taskManagerExternalPortRange,
                                taskManagerBindAddress,
                                rpcSystem.deref());

                // we always need the 'commonRpcService' for auxiliary calls
                // bind to the JobManager address with port 0
                commonRpcService =
                        createRemoteRpcService(
                                configuration, jobManagerBindAddress, 0, rpcSystem.deref());
                metricQueryServiceRpcService =
                        MetricUtils.startRemoteMetricsRpcService(
                                configuration,
                                commonRpcService.getAddress(),
                                null,
                                rpcSystem.deref());
            }

            metricRegistry.startQueryService(metricQueryServiceRpcService, null);

            processMetricGroup =
                    MetricUtils.instantiateProcessMetricGroup(
                            metricRegistry,
                            RpcUtils.getHostname(commonRpcService),
                            ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                    configuration));
			//创建线程池,执行对应的任务
            ioExecutor =
                    Executors.newFixedThreadPool(
                            ClusterEntrypointUtils.getPoolSize(configuration),
                            new ExecutorThreadFactory("mini-cluster-io"));
            //高可用服务工厂
            haServicesFactory = createHighAvailabilityServicesFactory(configuration);
            //高可用服务
            haServices = createHighAvailabilityServices(configuration, ioExecutor);
            //BLOB(二进制大对象)存储服务创建并启用
            blobServer =
                    BlobUtils.createBlobServer(
                            configuration,
                            Reference.borrowed(workingDirectory.getBlobStorageDirectory()),
                            haServices.createBlobStore());
            blobServer.start();
             //监控所有服务心跳检测
            heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
			//代理Tokne管理器
            delegationTokenManager =
                    KerberosDelegationTokenManagerFactory.create(
                            getClass().getClassLoader(),
                            configuration,
                            commonRpcService.getScheduledExecutor(),
                            ioExecutor);
			//BLOB缓存服务:提供永久和临时的存储,并提供对BLOB服务的访问
            blobCacheService =
                    BlobUtils.createBlobCacheService(
                            configuration,
                            Reference.borrowed(workingDirectory.getBlobStorageDirectory()),
                            haServices.createBlobStore(),
                            new InetSocketAddress(
                                    InetAddress.getLocalHost(), blobServer.getPort()));

            startTaskManagers();

            MetricQueryServiceRetriever metricQueryServiceRetriever =
                    new RpcMetricQueryServiceRetriever(
                            metricRegistry.getMetricQueryServiceRpcService());

            setupDispatcherResourceManagerComponents(
                    configuration,
                    dispatcherResourceManagerComponentRpcServiceFactory,
                    metricQueryServiceRetriever);
            //从高可用服务中获取资源管理领导者(选举和获取领导者)
            resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
            //从高可用服务中获取调度领导者(选举和获取领导者)
            dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
            //从高可用服务中获取集群空闲站点领导者服务(选举和获取领导者)
            clusterRestEndpointLeaderRetrievalService =
                    haServices.getClusterRestEndpointLeaderRetriever();
			//创建调度网关回收器
            dispatcherGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            commonRpcService,
                            DispatcherGateway.class,
                            DispatcherId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
            //创建资源管理网关回收器
            resourceManagerGatewayRetriever =
                    new RpcGatewayRetriever<>(
                            commonRpcService,
                            ResourceManagerGateway.class,
                            ResourceManagerId::fromUuid,
                            new ExponentialBackoffRetryStrategy(
                                    21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
            webMonitorLeaderRetriever = new LeaderRetriever();
            //资源管理领导者启用
            resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
            //调度领导者启用          
            dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
            //集群空闲站点领导者启用      
           clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
        } catch (Exception e) {
            // cleanup everything
            try {
                close();
            } catch (Exception ee) {
                e.addSuppressed(ee);
            }
            throw e;
        }

        // create a new termination future
        terminationFuture = new CompletableFuture<>();

        // now officially mark this as running
        running = true;

        LOG.info("Flink Mini Cluster started successfully");
    }
}

TaskManager


MiniCluster中的start(),调用下列方法

  1. 根据配置的TaskManager的个数启动,默认是1。
  2. 创建TaskExecutor注入RpcService(任务管理Rcp服务),haServices(高可用服务), heartbeatServices(心跳检测服务),metricRegistry(metric注册),blobCacheService(BLOB缓存服务),workingDirectory(工作目录),fatalErrorHandler(失败异常)。
  3. 启动TaskExecutor,并添加到taskManagers数组中。
 private void startTaskManagers() throws Exception {
    final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
    for (int i = 0; i < numTaskManagers; i++) {
        startTaskManager();
    }
}

public void startTaskManager() throws Exception {
    synchronized (lock) {
        final Configuration configuration = miniClusterConfiguration.getConfiguration();

        final TaskExecutor taskExecutor =
                TaskManagerRunner.startTaskManager(
                        configuration,
                        new ResourceID(UUID.randomUUID().toString()),
                        taskManagerRpcServiceFactory.createRpcService(),
                        haServices,
                        heartbeatServices,
                        metricRegistry,
                        blobCacheService,
                        useLocalCommunication(),
                        ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                        workingDirectory.createSubWorkingDirectory("tm_" + taskManagers.size()),
                        taskManagerTerminatingFatalErrorHandlerFactory.create(
                                taskManagers.size()));

        taskExecutor.start();
        taskManagers.add(taskExecutor);
    }
}

TaskExecutor


任务执行器,负责多个Task任务执行。

提交Task(submitTask)

@Override
public CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {

    try {
        final JobID jobId = tdd.getJobId();
        final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();

        final JobTable.Connection jobManagerConnection =
                jobTable.getConnection(jobId)
                        .orElseThrow(
                                () -> {
                                    final String message =
                                            "Could not submit task because there is no JobManager "
                                                    + "associated for the job "
                                                    + jobId
                                                    + '.';

                                    log.debug(message);
                                    return new TaskSubmissionException(message);
                                });

        if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
            final String message =
                    "Rejecting the task submission because the job manager leader id "
                            + jobMasterId
                            + " does not match the expected job manager leader id "
                            + jobManagerConnection.getJobMasterId()
                            + '.';

            log.debug(message);
            throw new TaskSubmissionException(message);
        }

        if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
            final String message =
                    "No task slot allocated for job ID "
                            + jobId
                            + " and allocation ID "
                            + tdd.getAllocationId()
                            + '.';
            log.debug(message);
            throw new TaskSubmissionException(message);
        }

        // re-integrate offloaded data:
        try {
            tdd.loadBigData(taskExecutorBlobService.getPermanentBlobService());
        } catch (IOException | ClassNotFoundException e) {
            throw new TaskSubmissionException(
                    "Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
        }

        // deserialize the pre-serialized information
        final JobInformation jobInformation;
        final TaskInformation taskInformation;
        try {
            jobInformation =
                    tdd.getSerializedJobInformation()
                            .deserializeValue(getClass().getClassLoader());
            taskInformation =
                    tdd.getSerializedTaskInformation()
                            .deserializeValue(getClass().getClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new TaskSubmissionException(
                    "Could not deserialize the job or task information.", e);
        }

        if (!jobId.equals(jobInformation.getJobId())) {
            throw new TaskSubmissionException(
                    "Inconsistent job ID information inside TaskDeploymentDescriptor ("
                            + tdd.getJobId()
                            + " vs. "
                            + jobInformation.getJobId()
                            + ")");
        }

        TaskManagerJobMetricGroup jobGroup =
                taskManagerMetricGroup.addJob(
                        jobInformation.getJobId(), jobInformation.getJobName());

        // note that a pre-existing job group can NOT be closed concurrently - this is done by
        // the same TM thread in removeJobMetricsGroup
        TaskMetricGroup taskMetricGroup =
                jobGroup.addTask(tdd.getExecutionAttemptId(), taskInformation.getTaskName());

        InputSplitProvider inputSplitProvider =
                new RpcInputSplitProvider(
                        jobManagerConnection.getJobManagerGateway(),
                        taskInformation.getJobVertexId(),
                        tdd.getExecutionAttemptId(),
                        taskManagerConfiguration.getRpcTimeout());

        final TaskOperatorEventGateway taskOperatorEventGateway =
                new RpcTaskOperatorEventGateway(
                        jobManagerConnection.getJobManagerGateway(),
                        executionAttemptID,
                        (t) -> runAsync(() -> failTask(executionAttemptID, t)));

        TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
        CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
        GlobalAggregateManager aggregateManager =
                jobManagerConnection.getGlobalAggregateManager();

        LibraryCacheManager.ClassLoaderHandle classLoaderHandle =
                jobManagerConnection.getClassLoaderHandle();
        PartitionProducerStateChecker partitionStateChecker =
                jobManagerConnection.getPartitionStateChecker();

        final TaskLocalStateStore localStateStore =
                localStateStoresManager.localStateStoreForSubtask(
                        jobId,
                        tdd.getAllocationId(),
                        taskInformation.getJobVertexId(),
                        tdd.getSubtaskIndex(),
                        taskManagerConfiguration.getConfiguration(),
                        jobInformation.getJobConfiguration());

        // TODO: Pass config value from user program and do overriding here.
        final StateChangelogStorage<?> changelogStorage;
        try {
            changelogStorage =
                    changelogStoragesManager.stateChangelogStorageForJob(
                            jobId,
                            taskManagerConfiguration.getConfiguration(),
                            jobGroup,
                            localStateStore.getLocalRecoveryConfig());
        } catch (IOException e) {
            throw new TaskSubmissionException(e);
        }

        final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();

        final TaskStateManager taskStateManager =
                new TaskStateManagerImpl(
                        jobId,
                        tdd.getExecutionAttemptId(),
                        localStateStore,
                        changelogStorage,
                        changelogStoragesManager,
                        taskRestore,
                        checkpointResponder);

        MemoryManager memoryManager;
        try {
            memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
        } catch (SlotNotFoundException e) {
            throw new TaskSubmissionException("Could not submit task.", e);
        }

        Task task =
                new Task(
                        jobInformation,
                        taskInformation,
                        tdd.getExecutionAttemptId(),
                        tdd.getAllocationId(),
                        tdd.getProducedPartitions(),
                        tdd.getInputGates(),
                        memoryManager,
                        taskExecutorServices.getIOManager(),
                        taskExecutorServices.getShuffleEnvironment(),
                        taskExecutorServices.getKvStateService(),
                        taskExecutorServices.getBroadcastVariableManager(),
                        taskExecutorServices.getTaskEventDispatcher(),
                        externalResourceInfoProvider,
                        taskStateManager,
                        taskManagerActions,
                        inputSplitProvider,
                        checkpointResponder,
                        taskOperatorEventGateway,
                        aggregateManager,
                        classLoaderHandle,
                        fileCache,
                        taskManagerConfiguration,
                        taskMetricGroup,
                        partitionStateChecker,
                        getRpcService().getScheduledExecutor());

        taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured);

        log.info(
                "Received task {} ({}), deploy into slot with allocation id {}.",
                task.getTaskInfo().getTaskNameWithSubtasks(),
                tdd.getExecutionAttemptId(),
                tdd.getAllocationId());

        boolean taskAdded;

        try {
            taskAdded = taskSlotTable.addTask(task);
        } catch (SlotNotFoundException | SlotNotActiveException e) {
            throw new TaskSubmissionException("Could not submit task.", e);
        }

        if (taskAdded) {
            task.startTaskThread();

            setupResultPartitionBookkeeping(
                    tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());
            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message =
                    "TaskManager already contains a task for id " + task.getExecutionId() + '.';

            log.debug(message);
            throw new TaskSubmissionException(message);
        }
    } catch (TaskSubmissionException e) {
        return FutureUtils.completedExceptionally(e);
    }
}

StreamGraph

生成执行图

二、使用步骤

1.引入库

代码如下(示例):

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import  ssl
ssl._create_default_https_context = ssl._create_unverified_context

2.读入数据

代码如下(示例):

data = pd.read_csv(
    'https://labfile.oss.aliyuncs.com/courses/1283/adult.data.csv')
print(data.head())

该处使用的url网络请求的数据。


总结

提示:这里对文章进行总结:

例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2320893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web开发-JS应用NodeJS原型链污染文件系统Express模块数据库通讯

知识点&#xff1a; 1、安全开发-NodeJS-开发环境&功能实现 2、安全开发-NodeJS-安全漏洞&案例分析 3、安全开发-NodeJS-特有漏洞 node.js就是专门运行javascript的一个应用程序&#xff0c;区别于以往用浏览器解析原生js代码&#xff0c;node.js本身就可以解析执行js代…

国产达梦(DM)数据库的安装(Linux系统)

目录 一、安装前的准备工作 1.1 导包 1.2 创建用户和组 1.3 修改文件打开最大数 1.4 目录规划 1.5 修改目录权限 二、安装DM8 2.1 挂载镜像 2.2 命令行安装 2.3 配置环境变量 2.4 启动图形化界面 三、配置实例 四、注册服务 五、启动 停止 查看状态 六、数据库客…

git的底层原理

git的底层原理 三段话总结git&#xff0c; 1. 工作原理&#xff1a;git管理是一个DAG有向无环图&#xff0c;HEAD指针指向branch或直接指向commit&#xff0c;branch指向commit&#xff0c;commit指向tree&#xff0c;tree指向别的tree或直接指向blob。 2. git所管理的一个目录…

MATLAB+Arduino利用板上的按键控制板上Led灯

几年不使用&#xff0c;之前的知识都忘掉了。需要逐步捡起来。 1 熟悉按键的使用 2熟悉灯的控制 1 电路 我们将通过 MATLAB 的 Arduino 支持包与 Arduino 板通信&#xff0c;读取按键状态并控制 LED 灯的亮灭。 按键&#xff1a;连接到 Arduino 的数字引脚&#xff08;例如…

Cocos Creator Shader入门实战(五):材质的了解、使用和动态构建

引擎&#xff1a;3.8.5 您好&#xff0c;我是鹤九日&#xff01; 回顾 前面的几篇文章&#xff0c;讲述的主要是Cocos引擎对Shader使用的一些固定规则&#xff0c;这里汇总下&#xff1a; 一、Shader实现基础是OpenGL ES可编程渲染管线&#xff0c;开发者只需关注顶点着色器和…

vue设置自定义logo跟标题

准备 Logo 图片 将自定义的 Logo 图片&#xff08;如 logo.png&#xff09;放置在项目的 public文件夹下。 使用环境变量设置 Logo 和标题&#xff08;可选&#xff09; 创建或修改 .env 文件 在项目根目录下创建或修改 .env 文件&#xff0c;添加以下内容&#xff1a; VITE_A…

尝试在软考65天前开始成为软件设计师-计算机网络

OSI/RM 七层模型 层次名功能主要协议7应用层实现具体应用功能 FTP(文件传输)、HTTP、Telnet、 POP3(邮件)SMTP(邮件) ------- DHCP、TFTP(小文件)、 SNMP、 DNS(域名) 6表示层数据格式,加密,压缩.....5会话层建立,管理&终止对话4传输层端到端连接TCP,UDP3网络层分组传输&a…

VMware主机换到高配电脑,高版本系统的问题

原来主机是i3 ,windows7系统&#xff0c;vmware 14.0,虚机系统是ubuntu 14.04。目标新机是i7 14700KF,windows11系统。原以为安装虚拟机&#xff0c;将磁盘文件&#xff0c;虚拟机配置文件拷贝过去可以直接用。 新目标主机先安装了vmware 15&#xff0c;运行原理虚机&#xff0…

【Linux内核系列】:动静态库详解

&#x1f525; 本文专栏&#xff1a;Linux &#x1f338;作者主页&#xff1a;努力努力再努力wz &#x1f4aa; 今日博客励志语录&#xff1a; 有些鸟儿是注定是关不住的&#xff0c;因为它们的每一片羽翼都沾满了自由的光辉 ★★★ 本文前置知识&#xff1a; 编译与链接的过程…

windows环境下NER Python项目环境配置(内含真的从头安的perl配置)

注意 本文是基于完整项目的环境配置&#xff0c;即本身可运行项目你拿来用 其中有一些其他问题&#xff0c;知道的忽略即可 导入pycharm基本包怎么下就不说了&#xff08;这个都问&#xff1f;给你一拳o(&#xff40;ω*)o&#xff09; 看perl跳转第5条 1.predict报错多个设备…

IDEA批量替换项目下所有文件中的特定内容

文章目录 1. 问题引入2. 批量替换项目下所有文件中的特定内容2.1 右键项目的根目录&#xff0c;点击在文件中替换2.2 输入要替换的内容 3. 解决替换一整行文本后出现空行的问题4. 增加筛选条件提高匹配的精确度 更多 IDEA 的使用技巧可以查看 IDEA 专栏&#xff1a; IDEA 1. 问…

【蓝桥杯】4535勇闯魔堡(多源BFS + 二分)

思路 k有一个范围&#xff08;0到怪物攻击的最大值&#xff09;&#xff0c;求满足要求的k的最小值。很明显的二分套路。 关键是check函数怎么写&#xff0c;我们需要找到一条从第一行到最后一行的路径&#xff0c;每一次可以从上下左右四个方向前进&#xff0c;那么我么可以用…

HTML图像标签的详细介绍

1. 常用图像格式 格式特点适用场景JPEG有损压缩&#xff0c;文件小&#xff0c;不支持透明适合照片、复杂图像PNG无损压缩&#xff0c;支持透明&#xff08;Alpha通道&#xff09;适合图标、需要透明背景的图片GIF支持动画&#xff0c;最多256色简单动画、低色彩图标WebP谷歌开…

Chapter 4-15. Troubleshooting Congestion in Fibre Channel Fabrics

show zone member: Shows the name of the zone to which a device belongs to. This command can be used to find the victims of a culprit device or vice versa. 显示设备所属的区域名称。该命令可用于查找罪魁祸首设备的受害者,反之亦然。 show zone active: Shows the…

QT三 自定义控件

一 自定义控件 现在的需求是这样&#xff1a; 假设我们要在QWidget 上做定制&#xff0c;这个定制包括了关于 一些事件处理&#xff0c;意味着要重写QWidget的一些代码&#xff0c;这是不实际的&#xff0c;因此我们需要自己写一个MyWidget继承QWidget&#xff0c;然后再MyWi…

在 ASP .NET Core 9.0 中使用 Scalar 创建漂亮的 API 文档

示例代码&#xff1a;https://download.csdn.net/download/hefeng_aspnet/90407900 Scalar 是一款可帮助我们为 API 创建精美文档的工具。与感觉有些过时的默认 Swagger 文档不同&#xff0c;Scalar 为 API 文档提供了全新而现代的 UI。其简洁的设计让开发人员可以轻松找到测试…

用于 RGB-D 显著目标检测的点感知交互和 CNN 诱导的细化网络(问题)

摘要 问题一&#xff1a;但在对自模态和跨模态的全局长距离依赖关系进行建模方面仍显不足。什么意思&#xff1f; 自模态&#xff08;Intra-modal&#xff09;全局依赖&#xff1a;在同一模态内&#xff0c;长距离像素之间的信息交互对于理解全局背景很重要&#xff0c;但 CN…

python3 -m http.sever 8080加载不了解决办法

解决方法很多&#xff0c;本文设置各种处理方法&#xff0c;逻辑上需要根据你的自身情况选择 我会告诉你遇到这种问题怎么做&#xff0c;根据具体症状处理 如需转载&#xff0c;标记出处 背景&#xff1a; 1。如图 2.。域名访问不了 http://www.meiduo.site:8080/register.ht…

Oracle数据库性能优化全攻略:十大关键方向深度解析与实践指南

文章目录 一、SQL查询优化二、索引优化三、内存管理四、I/O优化五、分区表与分区索引六、并行处理七、统计信息管理八、锁与并发控制九、数据库参数调优十、应用设计优化结论 在当今数据驱动的时代&#xff0c;数据库的性能优化成为了确保企业应用高效运行的关键。Oracle作为业…

windows清除电脑开机密码,可保留原本的系统和资料,不重装系统

前言 很久的一台电脑没有使用了&#xff0c;开机密码忘了&#xff0c;进不去系统 方法 1.将一个闲置u盘设置成pe盘&#xff08;注意&#xff0c;这个操作会清空原来u盘的数据&#xff0c;需要在配置前将重要数据转移走&#xff0c;数据无价&#xff0c;别因为配置这个丢了重…