Flink
以本地运行作为解读,版本1.16.0
文章目录
- Flink
- 前言
- StreamExecutionEnvironment
- LocalExecutor
- MiniCluster
- 启动MiniCluster
- TaskManager
- TaskExecutor
- 提交Task(submitTask)
- StreamGraph
- 二、使用步骤
- 1.引入库
- 2.读入数据
- 总结
前言
提示:这里可以添加本文要记录的大概内容:
例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。
提示:以下是本篇文章正文内容,下面案例可供参考
StreamExecutionEnvironment
流执行环境:本地使用:LocalStreamEnvironment,远程使用:RemoteStreamEnvironment。
1.1 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.1.1 获取所有算子,只能包含一个sink(输出)类型的算子。
1.1.2 调用方法getStreamGraph()将算子转换为流图(StreamGraph)。
1.1.3 调用execute(StreamGraph streamGraph)。
public JobExecutionResult execute(String jobName) throws Exception {
final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
StreamGraph streamGraph = getStreamGraph();
if (jobName != null) {
streamGraph.setJobName(jobName);
}
try {
return execute(streamGraph);
} catch (Throwable t) {
// Retry without cache if it is caused by corrupted cluster dataset.
invalidateCacheTransformations(originalTransformations);
streamGraph = getStreamGraph(originalTransformations);
return execute(streamGraph);
}
}
1.2 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.2.1 通过executeAsync(StreamGraph streamGraph),方法异步执行流图。
1.2.2 根据返回的JobClient,用户控制作业的执行。
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null, strippedException);
});
ExceptionUtils.rethrowException(strippedException);
return null;
}
}
1.3 通过StreamExecutionEnvironment调用executeAsync(StreamGraph streamGraph)方法。
1.3.1 通过getPipelineExecutor()方法获取PipelineExecutor为LocalExecutor。
1.3.2 LocalExecutor根据提供的工作流图,并执行。获取JobClient,允许与正在执行的作业进行交互。例如:取消作业或获取保存点。
1.3.3 JobListener监听JobClient。
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
final PipelineExecutor executor = getPipelineExecutor();
CompletableFuture<JobClient> jobClientFuture =
executor.execute(streamGraph, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
collectIterators.clear();
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException =
ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
LocalExecutor
实现了PipelineExecutor,负责执行StreamGraph,即用户提交的作业。
- 将StreamGraph通过getJobGraph(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)生成为JobGraph。
- 通过submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader)方法创建一个MiniCluster并提交一个任务(Job)。
public CompletableFuture<JobClient> execute(
Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
.submitJob(jobGraph, userCodeClassloader);
}
MiniCluster
本地执行Job任务
启动MiniCluster
1.1 创建workingDirectory,WorkingDirectory:该类管理进程或实例的工作目录,当已经被实例化后,该类确保指定的工作目录已存在。
1.2 创建metricRegistry,MetricRegistry:该类跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接。
1.3 创建commonRpcService,RpcService:rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程。
1.4 taskManagerRpcServiceFactory = commonRpcService
1.5 创建metricQueryServiceRpcService,RpcService
1.6 将metricQueryServiceRpcService设置到metricRegistry的用于初始化MetricQueryService。
1.7 创建processMetricGroup,ProcessMetricGroup,并设置metricRegistry。
1.8 创建ioExecutor,ExecutorService,线程池用于任务执行。
1.9 创建haServicesFactory,HighAvailabilityServicesFactory,创建高可用服务工厂。
1.20 创建haServicesFactory,通过高可用工厂创建高可用服务,并将当前注入到ioExecutor。HighAvailabilityServices:高可用服务可以访问所有支持高可用的组件(服务),这些服务提供了高可用的存储和服务注册,以及分布式计算和领导者选举。
ResourceManager 领导者选举并获取领导者信息
JobManager 领导者选举并获取领导者信息
Persistence 用户检查点元数据
Registering 最新完成的检查点
Persistence BLOB(二进制大对象)存储
Registry 标记作业状态
Naming RPC站点
1.21 创建blobServer并启动,BlobServer:BLOB服务器负责监听传入的请求,并生成线程来处理这些请求。此外,它还负责创建目录结构来存储BLOB或临时缓存它们。
1.22 创建heartbeatServices,HeartbeatServices:提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送器。
1.23 创建delegationTokenManager,传入了commonRpcService.getScheduledExecutor()和ioExecutor。
DelegationTokenManager:Flink集群代理所有的Token管理器。代理Token启动后,此管理器将确保长时间运行的应用程序在访问安全服务时可以不中断地运行。它必须联系所有配置的安全服务,以获取要分发给应用程序其余部分的委托令牌。
1.24 创建blobCacheService传入workingDirectory.getBlobStorageDirectory(),haServices.createBlobStore(),InetSocketAddress的地址。
BlobCacheService:存储为永久或临时BLOB,并提供对BLOB服务的访问
1.25 startTaskManagers()启动TaskManager服务
1.26 resourceManagerLeaderRetriever,从高可用服务中获取资源管理领导者(选举和获取领导者),在后续启动
1.27 dispatcherLeaderRetriever,从高可用服务中获取调度领导者(选举和获取领导者),在后续启动
1.28 clusterRestEndpointLeaderRetrievalService,从高可用服务中获取集群空闲站点领导者服务(选举和获取领导者),在后续启动
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "MiniCluster is already running");
LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", miniClusterConfiguration);
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final boolean useSingleRpcService =
miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
try {
//管理进程或实例的工作目录
workingDirectory =
WorkingDirectory.create(
ClusterEntrypointUtils.generateWorkingDirectoryFile(
configuration,
Optional.of(PROCESS_WORKING_DIR_BASE),
"minicluster_" + ResourceID.generate()));
initializeIOFormatClasses(configuration);
rpcSystem = rpcSystemSupplier.get();
LOG.info("Starting Metrics Registry");
//跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接
metricRegistry =
createMetricRegistry(
configuration,
rpcSystem.deref().getMaximumMessageSizeInBytes(configuration));
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;
final RpcService metricQueryServiceRpcService;
if (useSingleRpcService) {
// we always need the 'commonRpcService' for auxiliary calls
//rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程
commonRpcService = createLocalRpcService(configuration, rpcSystem.deref());
final CommonRpcServiceFactory commonRpcServiceFactory =
new CommonRpcServiceFactory(commonRpcService);
taskManagerRpcServiceFactory = commonRpcServiceFactory;
dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;
metricQueryServiceRpcService =
MetricUtils.startLocalMetricsRpcService(
configuration, rpcSystem.deref());
} else {
// start a new service per component, possibly with custom bind addresses
final String jobManagerExternalAddress =
miniClusterConfiguration.getJobManagerExternalAddress();
final String taskManagerExternalAddress =
miniClusterConfiguration.getTaskManagerExternalAddress();
final String jobManagerExternalPortRange =
miniClusterConfiguration.getJobManagerExternalPortRange();
final String taskManagerExternalPortRange =
miniClusterConfiguration.getTaskManagerExternalPortRange();
final String jobManagerBindAddress =
miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress =
miniClusterConfiguration.getTaskManagerBindAddress();
dispatcherResourceManagerComponentRpcServiceFactory =
new DedicatedRpcServiceFactory(
configuration,
jobManagerExternalAddress,
jobManagerExternalPortRange,
jobManagerBindAddress,
rpcSystem.deref());
taskManagerRpcServiceFactory =
new DedicatedRpcServiceFactory(
configuration,
taskManagerExternalAddress,
taskManagerExternalPortRange,
taskManagerBindAddress,
rpcSystem.deref());
// we always need the 'commonRpcService' for auxiliary calls
// bind to the JobManager address with port 0
commonRpcService =
createRemoteRpcService(
configuration, jobManagerBindAddress, 0, rpcSystem.deref());
metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration,
commonRpcService.getAddress(),
null,
rpcSystem.deref());
}
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
processMetricGroup =
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
RpcUtils.getHostname(commonRpcService),
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
//创建线程池,执行对应的任务
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("mini-cluster-io"));
//高可用服务工厂
haServicesFactory = createHighAvailabilityServicesFactory(configuration);
//高可用服务
haServices = createHighAvailabilityServices(configuration, ioExecutor);
//BLOB(二进制大对象)存储服务创建并启用
blobServer =
BlobUtils.createBlobServer(
configuration,
Reference.borrowed(workingDirectory.getBlobStorageDirectory()),
haServices.createBlobStore());
blobServer.start();
//监控所有服务心跳检测
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
//代理Tokne管理器
delegationTokenManager =
KerberosDelegationTokenManagerFactory.create(
getClass().getClassLoader(),
configuration,
commonRpcService.getScheduledExecutor(),
ioExecutor);
//BLOB缓存服务:提供永久和临时的存储,并提供对BLOB服务的访问
blobCacheService =
BlobUtils.createBlobCacheService(
configuration,
Reference.borrowed(workingDirectory.getBlobStorageDirectory()),
haServices.createBlobStore(),
new InetSocketAddress(
InetAddress.getLocalHost(), blobServer.getPort()));
startTaskManagers();
MetricQueryServiceRetriever metricQueryServiceRetriever =
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService());
setupDispatcherResourceManagerComponents(
configuration,
dispatcherResourceManagerComponentRpcServiceFactory,
metricQueryServiceRetriever);
//从高可用服务中获取资源管理领导者(选举和获取领导者)
resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
//从高可用服务中获取调度领导者(选举和获取领导者)
dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
//从高可用服务中获取集群空闲站点领导者服务(选举和获取领导者)
clusterRestEndpointLeaderRetrievalService =
haServices.getClusterRestEndpointLeaderRetriever();
//创建调度网关回收器
dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
commonRpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
//创建资源管理网关回收器
resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
commonRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
webMonitorLeaderRetriever = new LeaderRetriever();
//资源管理领导者启用
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
//调度领导者启用
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
//集群空闲站点领导者启用
clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
} catch (Exception e) {
// cleanup everything
try {
close();
} catch (Exception ee) {
e.addSuppressed(ee);
}
throw e;
}
// create a new termination future
terminationFuture = new CompletableFuture<>();
// now officially mark this as running
running = true;
LOG.info("Flink Mini Cluster started successfully");
}
}
TaskManager
MiniCluster中的start(),调用下列方法
- 根据配置的TaskManager的个数启动,默认是1。
- 创建TaskExecutor注入RpcService(任务管理Rcp服务),haServices(高可用服务), heartbeatServices(心跳检测服务),metricRegistry(metric注册),blobCacheService(BLOB缓存服务),workingDirectory(工作目录),fatalErrorHandler(失败异常)。
- 启动TaskExecutor,并添加到taskManagers数组中。
private void startTaskManagers() throws Exception {
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
for (int i = 0; i < numTaskManagers; i++) {
startTaskManager();
}
}
public void startTaskManager() throws Exception {
synchronized (lock) {
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final TaskExecutor taskExecutor =
TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
useLocalCommunication(),
ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
workingDirectory.createSubWorkingDirectory("tm_" + taskManagers.size()),
taskManagerTerminatingFatalErrorHandlerFactory.create(
taskManagers.size()));
taskExecutor.start();
taskManagers.add(taskExecutor);
}
}
TaskExecutor
任务执行器,负责多个Task任务执行。
提交Task(submitTask)
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
try {
final JobID jobId = tdd.getJobId();
final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
final JobTable.Connection jobManagerConnection =
jobTable.getConnection(jobId)
.orElseThrow(
() -> {
final String message =
"Could not submit task because there is no JobManager "
+ "associated for the job "
+ jobId
+ '.';
log.debug(message);
return new TaskSubmissionException(message);
});
if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
final String message =
"Rejecting the task submission because the job manager leader id "
+ jobMasterId
+ " does not match the expected job manager leader id "
+ jobManagerConnection.getJobMasterId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message =
"No task slot allocated for job ID "
+ jobId
+ " and allocation ID "
+ tdd.getAllocationId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
// re-integrate offloaded data:
try {
tdd.loadBigData(taskExecutorBlobService.getPermanentBlobService());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
}
// deserialize the pre-serialized information
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
jobInformation =
tdd.getSerializedJobInformation()
.deserializeValue(getClass().getClassLoader());
taskInformation =
tdd.getSerializedTaskInformation()
.deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not deserialize the job or task information.", e);
}
if (!jobId.equals(jobInformation.getJobId())) {
throw new TaskSubmissionException(
"Inconsistent job ID information inside TaskDeploymentDescriptor ("
+ tdd.getJobId()
+ " vs. "
+ jobInformation.getJobId()
+ ")");
}
TaskManagerJobMetricGroup jobGroup =
taskManagerMetricGroup.addJob(
jobInformation.getJobId(), jobInformation.getJobName());
// note that a pre-existing job group can NOT be closed concurrently - this is done by
// the same TM thread in removeJobMetricsGroup
TaskMetricGroup taskMetricGroup =
jobGroup.addTask(tdd.getExecutionAttemptId(), taskInformation.getTaskName());
InputSplitProvider inputSplitProvider =
new RpcInputSplitProvider(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getRpcTimeout());
final TaskOperatorEventGateway taskOperatorEventGateway =
new RpcTaskOperatorEventGateway(
jobManagerConnection.getJobManagerGateway(),
executionAttemptID,
(t) -> runAsync(() -> failTask(executionAttemptID, t)));
TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
GlobalAggregateManager aggregateManager =
jobManagerConnection.getGlobalAggregateManager();
LibraryCacheManager.ClassLoaderHandle classLoaderHandle =
jobManagerConnection.getClassLoaderHandle();
PartitionProducerStateChecker partitionStateChecker =
jobManagerConnection.getPartitionStateChecker();
final TaskLocalStateStore localStateStore =
localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex(),
taskManagerConfiguration.getConfiguration(),
jobInformation.getJobConfiguration());
// TODO: Pass config value from user program and do overriding here.
final StateChangelogStorage<?> changelogStorage;
try {
changelogStorage =
changelogStoragesManager.stateChangelogStorageForJob(
jobId,
taskManagerConfiguration.getConfiguration(),
jobGroup,
localStateStore.getLocalRecoveryConfig());
} catch (IOException e) {
throw new TaskSubmissionException(e);
}
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
final TaskStateManager taskStateManager =
new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
changelogStorage,
changelogStoragesManager,
taskRestore,
checkpointResponder);
MemoryManager memoryManager;
try {
memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
} catch (SlotNotFoundException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
Task task =
new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
partitionStateChecker,
getRpcService().getScheduledExecutor());
taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured);
log.info(
"Received task {} ({}), deploy into slot with allocation id {}.",
task.getTaskInfo().getTaskNameWithSubtasks(),
tdd.getExecutionAttemptId(),
tdd.getAllocationId());
boolean taskAdded;
try {
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
if (taskAdded) {
task.startTaskThread();
setupResultPartitionBookkeeping(
tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message =
"TaskManager already contains a task for id " + task.getExecutionId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}
StreamGraph
生成执行图
二、使用步骤
1.引入库
代码如下(示例):
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
2.读入数据
代码如下(示例):
data = pd.read_csv(
'https://labfile.oss.aliyuncs.com/courses/1283/adult.data.csv')
print(data.head())
该处使用的url网络请求的数据。
总结
提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。