文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Flink执行流程与源码分析

2024-12-14 00:11

关注

Flink主要组件

作业管理器(JobManager)

(1) 控制一个应用程序执行的主进程,也就是说,每个应用程序 都会被一个不同的Jobmanager所控制执行

(2) Jobmanager会先接收到要执行的应用程序,这个应用程序会包括:作业图( Job Graph)、逻辑数据流图( ogical dataflow graph)和打包了所有的类、库和其它资源的JAR包。

(3) Jobmanager会把 Jobgraph转换成一个物理层面的 数据流图,这个图被叫做 “执行图”(Executiongraph),包含了所有可以并发执行的任务。Job Manager会向资源管理器( Resourcemanager)请求执行任务必要的资源,也就是 任务管理器(Taskmanager)上的插槽slot。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 Taskmanager上。而在运行过程中Jobmanagera会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。

任务管理器(Taskmanager)

(1) Flink中的工作进程。通常在 Flink中会有多个 Taskmanageria运行, 每个 Taskmanageri都包含了一定数量的插槽( slots)。插槽的数量限制了Taskmanageri能够执行的任务数量。

(2) 启动之后, Taskmanager会向资源管理器注册它的插槽;收到资源管理器的指令后, Taskmanageri就会将一个或者多个插槽提供给Jobmanageri调用。Jobmanager就可以向插槽分配任务( tasks)来执行了。

(3) 在执行过程中, 一个 Taskmanagera可以跟其它运行同一应用程序的Taskmanager交换数据。

资源管理器(Resource Manager)

(1) 主要负责管理任务管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定义的处理资源单元。

(2) Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARNMesos、K8s,以及 standalone部署。

(3) 当 Jobmanager申请插槽资源时, Resourcemanager会将有空闲插槽的Taskmanager?分配给Jobmanager。如果 Resourcemanagery没有足够的插槽来满足 Jobmanager的请求, 它还可以向资源提供平台发起会话,以提供启动 Taskmanager进程的容器。

分发器(Dispatcher)

(1) 可以跨作业运行,它为应用提交提供了REST接口。

(2)当一个应用被提交执行时,分发器就会启动并将应用移交给Jobmanage

(3) Dispatcher他会启动一个 WebUi,用来方便地 展示和监控作业执行的信息。

任务提交流程

  1. 提交应用
  2. 启动并提交应用
  3. 请求slots
  4. 任务启动
  5. 注册slots
  6. 发出提供slot的指令
  7. 提供slots
  8. 提交要在slots中执行的任务
  9. 交换数据

任务提交流程(YARN)

a. Flink任务提交后,Client向HDFS上传Flink的Jar包和配置

b. 随后向 Yarn ResourceManager提交任务ResourceManager分配 Container资源并通知对应的NodeManager启动

c. ApplicationMaster,ApplicationMaster 启动后加载Flink的Jar包和配置构建环境

d. 然后启动JobManager , 之后ApplicationMaster 向ResourceManager 申请资源启动TaskManager

e. ResourceManager 分配 Container 资源后 , 由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager

f. NodeManager 加载 Flink 的 Jar 包和配置构建环境并启动 TaskManager

g. TaskManager 启动后向 JobManager 发送心跳包,并等待 JobManager 向其分配任务。

源码分析--集群启动 JobManager 启动分析

JobManager 的内部包含非常重要的三大组件

入口,启动主类:StandaloneSessionClusterEntrypoint

  1. // 入 口 
  2. StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); 
  3. clusterEntrypoint.startCluster();  
  4. runCluster(configuration, pluginManager); 
  5.  
  6. // 第一步:初始化各种服务 
  7.   
  8. initializeServices(configuration, pluginManager); 
  9.  
  10. // 创建 DispatcherResourceManagerComponentFactory, 初始化各种组件的 
  11. 工厂实例 
  12. // 其实内部包含了三个重要的成员变量: 
  13. // 创建 ResourceManager 的工厂实例 
  14. // 创建 Dispatcher 的工厂实例 
  15. // 创建 WebMonitorEndpoint 的工厂实例 
  16. createDispatcherResourceManagerComponentFactory(configuration); 
  17.  
  18. // 创建 集群运行需要的一些组件:Dispatcher, ResourceManager 等 
  19. // 创 建 ResourceManager 
  20. // 创 建 Dispatcher 
  21. // 创 建 WebMonitorEndpoint 
  22. clusterComponent = dispatcherResourceManagerComponentFactory.create(...) 

1. initializeServices():初始化各种服务

  1. // 初 始 化 和 启 动 AkkaRpcService, 内 部 其 实 包 装 了 一 个 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...) 
  2.  
  3. // 初始化一个负责 IO 的线程池 
  4. ioExecutor = Executors.newFixedThreadPool(...) 
  5. // 初始化 HA 服务组件,负责 HA 服务的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor); 
  6.  
  7. // 初始化 BlobServer 服务端 
  8. blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); 
  9.  
  10. // 初始化心跳服务组件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration); 
  11.  
  12. // 初始化一个用来存储 ExecutionGraph 的 Store, 实现是: 
  13. FileArchivedExecutionGraphStore 
  14. archivedExecutionGraphStore = createSerializableExecutionGraphStore(...) 

2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多组件的工厂实例

  1. 1、DispatcherRunnerFactory,默认实现:DefaultDispatcherRunnerFactory  
  2.  
  3. 2、ResourceManagerFactory,默认实现:StandaloneResourceManagerFactory  
  4.  
  5. 3、RestEndpointFactory,默认实现:SessionRestEndpointFactory 
  6.  
  7. clusterComponent = dispatcherResourceManagerComponentFactory 
  8.     .create(configuration, ioExecutor, commonRpcService, haServices, 
  9.      blobServer, heartbeatServices, metricRegistry, 
  10.      archivedExecutionGraphStore, 
  11.      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 
  12.      this); 

3. 创建 WebMonitorEndpoint

  1.  
  2.  webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 
  3.   configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, 
  4.   blobServer, executor, metricFetcher, 
  5.   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), 
  6.   fatalErrorHandler 
  7.  ); 

4. 创建 resourceManager

  1.  
  2. resourceManager = resourceManagerFactory.createResourceManager( 
  3.  configuration, ResourceID.generate(), 
  4.  rpcService, highAvailabilityServices, heartbeatServices, 
  5.  fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), 
  6.  webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname 
  7. ); 
  1. protected ResourceManager createResourceManager( 
  2.   Configuration configuration, 
  3.   ResourceID resourceId, 
  4.   RpcService rpcService, 
  5.   HighAvailabilityServices highAvailabilityServices, 
  6.   HeartbeatServices heartbeatServices, 
  7.   FatalErrorHandler fatalErrorHandler, 
  8.   ClusterInformation clusterInformation, 
  9.   @Nullable String webInterfaceUrl, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, 
  11.   ResourceManagerRuntimeServices resourceManagerRuntimeServices) { 
  12.  
  13.  final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); 
  14.  
  15.   
  16.  return new StandaloneResourceManager( 
  17.   rpcService, 
  18.   resourceId, 
  19.   highAvailabilityServices, 
  20.   heartbeatServices, 
  21.   resourceManagerRuntimeServices.getSlotManager(), 
  22.   ResourceManagerPartitionTrackerImpl::new, 
  23.   resourceManagerRuntimeServices.getJobLeaderIdService(), 
  24.   clusterInformation, 
  25.   fatalErrorHandler, 
  26.   resourceManagerMetricGroup, 
  27.   standaloneClusterStartupPeriodTime, 
  28.   AkkaUtils.getTimeoutAsTime(configuration) 
  29.  ); 
  30.  
  31.  } 
  32.   
  1.  
  2. public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, 
  3.   HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, 
  4.   JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, 
  5.   ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { 
  6.  
  7.   
  8.  super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null); 
  1. protected RpcEndpoint(final RpcService rpcService, final String endpointId) { 
  2.  this.rpcService = checkNotNull(rpcService, "rpcService"); 
  3.  this.endpointId = checkNotNull(endpointId, "endpointId"); 
  4.  
  5.   
  6.  this.rpcServer = rpcService.startServer(this); 

5. 在创建resourceManager同级:启动任务接收器Starting Dispatcher

  1.  
  2. log.debug("Starting Dispatcher."); 
  3. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( 
  4.  highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, 
  5.  // TODO_ZYM 注释: 注意第三个参数 
  6.  new HaServicesJobGraphStoreFactory(highAvailabilityServices), 
  7.  ioExecutor, rpcService, partialDispatcherServices 
  8. ); 

Dispatcher 启动后,将会等待任务提交,如果有任务提交,则会经过submitJob(...)函数进入后续处理。

提交(一个Flink应用的提交必须经过三个graph的转换)

首先看下一些名词

StreamGraph

是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。可以用一个 DAG 来表示),DAG 的顶点是 StreamNode,边是 StreamEdge,边包含了由哪个 StreamNode 依赖哪个 StreamNode。

DataStream 上常见的 transformation 有 map、flatmap、filter等(见DataStream Transformation了解更多)。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph

以map方法为例,看看源码

  1. public  SingleOutputStreamOperator map(MapFunction mapper) { 
  2.   // 通过java reflection抽出mapper的返回值类型 
  3.   TypeInformation outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), 
  4.       Utils.getCallLocationName(), true); 
  5.  
  6.   // 返回一个新的DataStream,SteramMap 为 StreamOperator 的实现类 
  7.   return transform("Map", outType, new StreamMap<>(clean(mapper))); 
  8.  
  9. public  SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator operator) { 
  10.   // read the output type of the input Transform to coax out errors about MissingTypeInfo 
  11.   transformation.getOutputType(); 
  12.  
  13.   // 新的transformation会连接上当前DataStream中的transformation,从而构建成一棵树 
  14.   OneInputTransformation resultTransform = new OneInputTransformation<>( 
  15.       this.transformation, 
  16.       operatorName, 
  17.       operator, 
  18.       outTypeInfo, 
  19.       environment.getParallelism()); 
  20.  
  21.   @SuppressWarnings({ "unchecked""rawtypes" }) 
  22.   SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, resultTransform); 
  23.  
  24.   // 所有的transformation都会存到 env 中,调用execute时遍历该list生成StreamGraph 
  25.   getExecutionEnvironment().addOperator(resultTransform); 
  26.  
  27.   return returnStream; 

map转换将用户自定义的函数MapFunction包装到StreamMap这个Operator中,再将StreamMap包装到OneInputTransformation,最后该transformation存到env中,当调用env.execute时,遍历其中的transformation集合构造出StreamGraph

JobGraph

(1) StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点。

(2) JobGraph 用来由 JobClient 提交给 JobManager,是由顶点(JobVertex)、中间结果(IntermediateDataSet)和边(JobEdge)组成的 DAG 图。

(3) JobGraph 定义作业级别的配置,而每个顶点和中间结果定义具体操作和中间数据的设置。

JobVertex

JobVertex 相当于是 JobGraph 的顶点。经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。

IntermediateDataSet

JobVertex的输出,即经过operator处理产生的数据集。

JobEdge

job graph中的一条数据传输通道。source 是IntermediateDataSet,sink 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。

(1) 首先是通过API会生成transformations,通过transformations会生成StreamGraph。

(2)将StreamGraph的某些StreamNode Chain在一起生成JobGraph,前两步转换都是在客户端完成。

(3)最后会将JobGraph转换为ExecutionGraph,相比JobGraph会增加并行度的概念,这一步是在Jobmanager里完成。

ExecutionJobVertex

ExecutionJobVertex一一对应JobGraph中的JobVertex

ExecutionVertex

一个ExecutionJobVertex对应n个ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任务的一个子任务

Execution

Execution 是对 ExecutionVertex 的一次执行,通过 ExecutionAttemptId 来唯一标识。

IntermediateResult

在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的对外输出,一个 JobGraph 可能有 n(n >=0) 个输出。在 ExecutionGraph 中,与此对应的就是 IntermediateResult。每一个 IntermediateResult 就有 numParallelProducers(并行度) 个生产者,每个生产者的在相应的 IntermediateResult 上的输出对应一个 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一个输出分区

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的输入,通过 ExecutionEdge 将 ExecutionVertex 和 IntermediateResultPartition 连接起来,进而在不同的 ExecutionVertex 之间建立联系。

ExecutionGraph的构建

  1. 构建JobInformation
  2. 构建ExecutionGraph
  3. 将JobGraph进行拓扑排序,获取sortedTopology顶点集合
  1. // ExecutionGraphBuilder 
  2.  public static ExecutionGraph buildGraph( 
  3.   @Nullable ExecutionGraph prior
  4.   JobGraph jobGraph, 
  5.   ...) throws JobExecutionException, JobException { 
  6.   // 构建JobInformation 
  7.    
  8.   // 构建ExecutionGraph 
  9.    
  10.   // 将JobGraph进行拓扑排序,获取sortedTopology顶点集合 
  11.   List sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); 
  12.    
  13.   executionGraph.attachJobGraph(sortedTopology); 
  14.  
  15.   return executionGraph; 
  16.  } 

构建ExecutionJobVertex,连接IntermediateResultPartition和ExecutionVertex

  1. //ExecutionGraph 
  2.  public void attachJobGraph(List topologiallySorted) throws JobException { 
  3.   for (JobVertex jobVertex : topologiallySorted) { 
  4.    // 构建ExecutionJobVertex 
  5.    ExecutionJobVertex ejv = new ExecutionJobVertex( 
  6.      this, 
  7.      jobVertex, 
  8.      1, 
  9.      maxPriorAttemptsHistoryLength, 
  10.      rpcTimeout, 
  11.      globalModVersion, 
  12.      createTimestamp); 
  13.    // 连接IntermediateResultPartition和ExecutionVertex 
  14.    ev.connectToPredecessors(this.intermediateResults); 
  15.  } 
  16.    
  17.    
  18.   // ExecutionJobVertex 
  19.  public void connectToPredecessors(Map intermediateDataSets) throws JobException { 
  20.   List inputs = jobVertex.getInputs(); 
  21.    
  22.   for (int num = 0; num < inputs.size(); num++) { 
  23.    JobEdge edge = inputs.get(num); 
  24.    IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); 
  25.    this.inputs.add(ires); 
  26.    int consumerIndex = ires.registerConsumer(); 
  27.     
  28.    for (int i = 0; i < parallelism; i++) { 
  29.     ExecutionVertex ev = taskVertices[i]; 
  30.     ev.connectSource(num, ires, edge, consumerIndex); 
  31.    } 
  32.   } 
  33.  } 

拆分计划(可执行能力)

  1. // ExecutionVertex 
  2.  public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { 
  3.  
  4.   final DistributionPattern pattern = edge.getDistributionPattern(); 
  5.   final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); 
  6.  
  7.   ExecutionEdge[] edges; 
  8.  
  9.   switch (pattern) { 
  10.    // 下游 JobVertex 的输入 partition 算法,如果是 forward 或 rescale 的话为 POINTWISE 
  11.    case POINTWISE: 
  12.     edges = connectPointwise(sourcePartitions, inputNumber); 
  13.     break; 
  14.    // 每一个并行的ExecutionVertex节点都会链接到源节点产生的所有中间结果IntermediateResultPartition 
  15.    case ALL_TO_ALL: 
  16.     edges = connectAllToAll(sourcePartitions, inputNumber); 
  17.     break; 
  18.  
  19.    default
  20.     throw new RuntimeException("Unrecognized distribution pattern."); 
  21.  
  22.   } 
  23.  
  24.   inputEdges[inputNumber] = edges; 
  25.   for (ExecutionEdge ee : edges) { 
  26.    ee.getSource().addConsumer(ee, consumerNumber); 
  27.   } 
  28.  } 
  29.  
  30.  
  31.  private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  32.   final int numSources = sourcePartitions.length; 
  33.   final int parallelism = getTotalNumberOfParallelSubtasks(); 
  34.  
  35.   // 如果并发数等于partition数,则一对一进行连接 
  36.   if (numSources == parallelism) { 
  37.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; 
  38.   } 
  39.   //  如果并发数大于partition数,则一对多进行连接 
  40.   else if (numSources < parallelism) { 
  41.  
  42.    int sourcePartition; 
  43.  
  44.    if (parallelism % numSources == 0) { 
  45.     int factor = parallelism / numSources; 
  46.     sourcePartition = subTaskIndex / factor; 
  47.    } 
  48.    else { 
  49.     float factor = ((float) parallelism) / numSources; 
  50.     sourcePartition = (int) (subTaskIndex / factor); 
  51.    } 
  52.  
  53.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) }; 
  54.   } 
  55.   // 果并发数小于partition数,则多对一进行连接 
  56.   else { 
  57.    if (numSources % parallelism == 0) { 
  58.     int factor = numSources / parallelism; 
  59.     int startIndex = subTaskIndex * factor; 
  60.  
  61.     ExecutionEdge[] edges = new ExecutionEdge[factor]; 
  62.     for (int i = 0; i < factor; i++) { 
  63.      edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); 
  64.     } 
  65.     return edges; 
  66.    } 
  67.    else { 
  68.     float factor = ((float) numSources) / parallelism; 
  69.  
  70.     int start = (int) (subTaskIndex * factor); 
  71.     int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ? 
  72.       sourcePartitions.length : 
  73.       (int) ((subTaskIndex + 1) * factor); 
  74.  
  75.     ExecutionEdge[] edges = new ExecutionEdge[end - start]; 
  76.     for (int i = 0; i < edges.length; i++) { 
  77.      edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber); 
  78.     } 
  79.  
  80.     return edges; 
  81.    } 
  82.   } 
  83.  } 
  84.  
  85.  
  86.  private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  87.   ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; 
  88.  
  89.   for (int i = 0; i < sourcePartitions.length; i++) { 
  90.    IntermediateResultPartition irp = sourcePartitions[i]; 
  91.    edges[i] = new ExecutionEdge(irp, this, inputNumber); 
  92.   } 
  93.  
  94.   return edges; 
  95.  } 

返回ExecutionGraph

TaskManager

TaskManager启动

  1. public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { 
  2.         //主要初始化一堆的service,并新建一个org.apache.flink.runtime.taskexecutor.TaskExecutor 
  3.   final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId); 
  4.   //调用TaskExecutor的start()方法 
  5.         taskManagerRunner.start(); 

TaskExecutor :submitTask()

接着的重要函数是shumitTask()函数,该函数会通过AKKA机制,向TaskManager发出一个submitTask的消息请求,TaskManager收到消息请求后,会执行submitTask()方法。(省略了部分代码)。

  1. public CompletableFuture submitTask( 
  2.    TaskDeploymentDescriptor tdd, 
  3.    JobMasterId jobMasterId, 
  4.    Time timeout) { 
  5.  
  6.     jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); 
  7.     taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); 
  8.     
  9.    TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx); 
  10.  
  11.    InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx); 
  12.  
  13.    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); 
  14.    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); 
  15.  
  16.    LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); 
  17.    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); 
  18.    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); 
  19.  
  20.    final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( 
  21.     jobId, 
  22.     tdd.getAllocationId(), 
  23.     taskInformation.getJobVertexId(), 
  24.     tdd.getSubtaskIndex()); 
  25.  
  26.    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); 
  27.  
  28.    final TaskStateManager taskStateManager = new TaskStateManagerImpl( 
  29.     jobId, 
  30.     tdd.getExecutionAttemptId(), 
  31.     localStateStore, 
  32.     taskRestore, 
  33.     checkpointResponder); 
  34.             //新建一个Task 
  35.    Task task = new Task(xxxx); 
  36.  
  37.    log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); 
  38.  
  39.    boolean taskAdded; 
  40.  
  41.    try { 
  42.     taskAdded = taskSlotTable.addTask(task); 
  43.    } catch (SlotNotFoundException | SlotNotActiveException e) { 
  44.     throw new TaskSubmissionException("Could not submit task.", e); 
  45.    } 
  46.  
  47.    if (taskAdded) { 
  48.        //启动任务 
  49.     task.startTaskThread(); 
  50.  
  51.     return CompletableFuture.completedFuture(Acknowledge.get()); 
  52.    }  

最后创建执行Task的线程,然后调用startTaskThread()来启动具体的执行线程,Task线程内部的run()方法承载了被执行的核心逻辑。

Task是执行在TaskExecutor进程里的一个线程,下面来看看其run方法

(1) 检测当前状态,正常情况为CREATED,如果是FAILED或CANCELING直接返回,其余状态将抛异常。

(2) 读取DistributedCache文件。

(3) 启动ResultPartitionWriter和InputGate。

(4) 向taskEventDispatcher注册partitionWriter。

(5) 根据nameOfInvokableClass加载对应的类并实例化。

(6) 将状态置为RUNNING并执行invoke方法。

  1. public void run() { 
  2.         while (true) { 
  3.             ExecutionState current = this.executionState; 
  4.             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 
  5.             network.registerTask(this); 
  6.             Environment env = new RuntimeEnvironment(. . . . ); 
  7.             invokable.setEnvironment(env); 
  8.             //  actual task core work 
  9.             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 
  10.             } 
  11.             // notify everyone that we switched to running 
  12.             notifyObservers(ExecutionState.RUNNING, null); 
  13.             executingThread.setContextClassLoader(userCodeClassLoader); 
  14.             // run the invokable 
  15.             invokable.invoke(); 
  16.  
  17.             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 
  18.                 notifyObservers(ExecutionState.FINISHED, null); 
  19.             } 
  20.             Finally{ 
  21.                 // free the network resources 
  22.                 network.unregisterTask(this); 
  23.                 // free memory resources 
  24.                 if (invokable != null) { 
  25.                     memoryManager.releaseAll(invokable); 
  26.                 } 
  27.                 libraryCache.unregisterTask(jobId, executionId); 
  28.                 removeCachedFiles(distributedCacheEntries, fileCache); 

总结

整体的流程与架构可能三两张图或者三言两语就可以勾勒出画面,但是背后源码的实现是艰辛的。源码的复杂度和当初设计框架的抓狂感,我们只有想象。现在我们只是站在巨人的肩膀上去学习。

本篇的主题是"Flink架构与执行流程",做下小结,Flink on Yarn的提交执行流程:

1 Flink任务提交后,Client向HDFS上传Flink的Jar包和配置。

2 向Yarn ResourceManager提交任务。

3 ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster。

4 ApplicationMaster启动后加载Flink的Jar包和配置构建环境。

5 启动JobManager之后ApplicationMaster向ResourceManager申请资源启动TaskManager。

6 ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager。

7 NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager。

8 TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。

 

来源:大数据左右手内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯