Flink源码分析之Flink on YARN – Per Job

问题导读

  1. 用户程序什么时候、在哪、如何被调用执行的?
  2. JobManager进程什么时候、在哪、如何被拉起来执行?启动后做了什么?
  3. TaskManager进程什么时候、在哪、如何被拉起来执行?
  4. 用户程序的Task什么时候、如何被分发到各个TaskManager进程中执行?

阅读说明:
源码版本:Flink release-1.14.4
先回顾一下YARN架构与YARN job 8步工作流程
Flink on yarn其实就是按照YARN job 8步工作流程走
以上述4个问题为导向,看Flink具体是如何实现的,8步中1、3、4、5、7、8在Flink代码哪里找到(2和6是YARN执行)

YARN架构

Flink-Yarn-PerJob-YARN.jpg

YARN集群介绍

YARN集群用来做资源的管理与用户应用程序的调度。

用户的应用程序是一个分布式程序,需要按照YARN的规范来写才能提交到YARN集群被调度运行起来。

ResourceManager(RM)

  • YARN集群中的Master。
  • 资源调度:根据容量、队列等限制条件将系统中的资源打包为Container对象分配给应用程序(AM)。
  • 应用程序管理:通知NM启动AM;监控AM运行状态并在失败时重启它。

NodeManager(NM)

  • YARN集群中的Slave。
  • 是应用程序运行的节点。
  • 接收来在ResourceManager的请求,划定一个Container描述的资源限制来启动用户应用程序的ApplicationMaster进程。
  • 接收来在用户应用程序ApplicationMaster的Container启动、停止请求。
  • 定时向ResourceManager汇报本节点上的资源使用情况和各个Container运行状态。

ApplicationMaster(AM)

当用户向YARN提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的Master,它负责向ResourceManager申请Container资源,并通知NodeManager启动Container来执行具体的任务,监控任务的状态以便在失败时重启Container或者任务完成时回收Container,这个就是ApplicationMaster。

Container

  • 是资源的抽象。
  • Container对象包含资源的限制信息:Vcores、Memory,也包含资源的位置信息:需要运行在哪个NodeManager节点上。

由NodeManager进程负责启动。

YARN job工作流程

  1. Client向ResourceManager提交应用程序(包含启动ApplicationMaster的命令)。
  2. ResourceManager为应用分配第一个Container并与对应的NodeManager通信要求它启动ApplicationMaster。
  3. ApplicationMaster向ResourceManager注册并与ResourceManager保持心跳。
  4. ApplicationMaster为任务的运行向ResourceManager申请若干Container资源。
  5. ApplicationMaster领取ResourceManager分配的Container并初始化相关运行信息,便与对应的NodeManager通信要求它启动Container。
  6. NodeManager为Container设置好运行环境(下载运行资源、设置环境变量、资源限制等),将启动命令写到脚本文件中,运行脚本启动Container。
  7. Container运行期间向ApplicationMaster汇报自己的状态和任务进度。
  8. 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销自己,释放相关Container资源。

用户程序什么时候、在哪、谁调用执行的?

入口示例程序

是一个Stream job

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

流程图

Flink-Yarn-PerJob-1.jpg

步骤说明

1. shell命令交由入口类CliFrontend执行。

2. CliFrontend加载配置和命令行参数,生成Configuration和PackagedProgram对象。

1) 找配置文件目录,优先级:system env(FLINK_CONF_DIR) > ../conf > conf。

2) 从配置文件目录中加载配置文件并解析命令行参数args融合生成Configuration,由Configuration构建出PackagedProgram(包含URL jarFile、Class mainClass、String[] args、List<URL> classpaths、URLClassLoader userCodeClassLoader、savepointSettings等信息)。

3) 通过ClientUtils设置用户程序的执行环境ContextEnvironment和StreamContextEnvironment,为两个Environment设置了PipelineExecutorServiceLoader(用于找到PipelineExecutorFactory)、Configuration和ClassLoader(用户程序PackagedProgram指定的URLClassLoader)等信息。

4) 设置当前线程ClassLoader为上述3中指定的用户程序ClassLoader,调用PackagedProgram执行用户程序,执行完用户程序后重置当前线程ClassLoader。

3. PackagedProgram通过反射的方式调用用户程序入口类的main方法执行用户程序。

4. 用户程序执行,完成StreamGraph的构建。

使用纯SQL API,转换过程SQL -> calcite(SqlNode -> RelNode) -> Operation -> Transformation -> Pipeline

使用Table API,转换过程Operation -> Transformation -> Pipeline

使用DataStream API,转换过程Transformation -> Pipeline

注:流模式Pipeline的实现类是StreamGraph。

5. 找到匹配的PipelineExecutor去执行Pipeline。

PipelineExecutor的实现有多种:

LocalExecutor:本地模式
RemoteExecutor:Standalone模式
YarnJobClusterExecutor:YARN per job模式
YarnSessionClusterExecutor:YARN session job模式
KubernetesSessionClusterExecutor:K8S session job模式
EmbeddedExecutor:Application模式用

这里采用的是YarnJobClusterExecutor,如何找?

  1. StreamExecutionEnvironment通过PipelineExecutorServiceLoader找到PipelineExecutorFactory。PipelineExecutorServiceLoader先以SPI的方式加载PipelineExecutorFactory,再过滤出 与Configuration配置兼容的Factory。
  2. PipelineExecutorFactory负责创建对应的PipelineExecutor,由PipelineExecutor去执行Pipeline。

6. 执行Pipeline:先构建JobGraph,再找到匹配的ClusterDescriptor来部署flink集群以执行JobGraph。

  1. StreamGraph -> JobGraph
  2. 由ClientFactory工厂类会创建对应的ClusterDescriptor,从Configuration中整理出ClusterSpecification(集群描述信息,包含JM和TM的内存大小以及slots个数)。
  3. 通过ClusterDescriptor部署集群:clusterDescriptor.deployJobCluster(ClusterSpecification, JobGraph, detached)。

注:
StreamGraph到JobGraph主要变化
node: ( List<StreamEdge> -> StreamNode -> List<StreamEdge> ) --> node --> node ...

node: ( List<JobEdge> -> JobVertex -> List<IntermediateDataSet> ) --> node --> node ...
的转换,另外完成Chaining,即多个StreamNode会合并为一个JobVertex

7. 通过ClusterDescriptor部署flink集群来执行JobGraph。

  1. 检查配置;
  2. 上传资源到HDFS;
  3. 提交yarn job让YARN启动AM;
  4. 循环等待提交结果ApplicationReport;
  5. 回设rpc、rest、ha信息;
  6. 返回集群Client对象RestClusterClient。

注:
HDFS目录由yarn.staging-directory参数指定
flink-dist, lib/, plugins/ 这些多个flink应用都用到的,预先上传到yarn.provided.lib.dirs参数指定的HDFS目录即可,NM会缓存避免频繁上传下载。

调用链

1. bin/flink run
  
2. CliFrontend.java
 	 	main(String[] args)
  		CliFrontend cli = new CliFrontend(configuration, customCommandLines)
  		retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args))
  			run(params)
  				executeProgram(Configuration, PackagedProgram)
  					ClientUtils.executeProgram
   ClientUtils.java
   	executeProgram
  		program.invokeInteractiveModeForExecution
3.  			callMainMethod(entryClass, args)
  					entryClass.getMethod("main", String[].class).invoke(null, (Object) args)
  
4. 用户程序入口类
   	main(args)
  
5. StreamExecutionEnvironment.java
   	executeAsync(StreamGraph)
  		executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader)
  
6. AbstractJobClusterExecutor.java
   	execute
			{
				JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration)
        clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)
        clusterSpecification = clusterClientFactory.getClusterSpecification(configuration)
        clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, detached)
			}
    	
7. YarnClusterDescriptor.java
   	ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification, JobGraph, detached)
  		deployInternal
				{		
  				1) 检查用户权限(Kerberos证书)、vcores、hadoop环境变量、yarn queue等
            
  				2) ApplicationReport report = startAppMaster(...)
          	{
              1. 上传资源到HDFS(yarnShipFiles、yarn.ship-archives、pipeline.jars、配置文件等)
              2. 将JobGraph序列化到文件中,并上传到HDFS(per job模式才有)
              3. 设置集群HA信息
              4. 设置ApplicationSubmissionContext
                    setApplicationName
                    setApplicationType
                    setAMContainerSpec    // Application Master Container
                        setCommands       // 通过class参数指定了yarn AM container启动类为YarnJobClusterEntrypoint
                        setTokens
                        setLocalResources(job.graph、flink-conf.yaml、yarn-site.xml、krb5.conf、keytab、kerberos)
                        setEnvironment
                            flink-conf.yaml中用户配置的以containerized.master.env.为前缀的变量
                            _FLINK_CLASSPATH // Flink app class path
                            _FLINK_DIST_JAR  // local path of flink dist jar 
                            classpath from YARN configuration
                            ...
                    setResource(masterMemoryMB、yarn.appmaster.vcores)
                    setPriority
                    setQueue	// yarn queue
                    setApplicationNodeLabel
                    setApplicationTags
                5. 提交yarn application: yarnClient.submitApplication(appContext)
                   // RUNNING或FINISHED状态时,正常退出循环;FAILED或KILLED时抛异常退出;状态变化时打印日志,运行超60秒打印日志
                6. loop循环等待提交结果: ApplicationReport report = yarnClient.getApplicationReport(appId)
        			}
  					3) 从ApplicationReport中获取rpc和rest的地址和端口信息以及ApplicationId信息,设置回Configuration;设置ClusterId=ApplicationId信息到HA中。
            4) 返回RestClusterClient(Configuration, ApplicationId)
				}	

总结

  1. YARN per job模式下用户程序在Client端被执行,Client端即执行flink shell命令的执行节点。
  2. Client端主要工作就是将用户写的代码转换为JobGraph,向YARN提交应用以执行JobGraph。
  3. User Code(SQL API、Table API、DataStream API)-> StreamGraph
  4. PipelineExecutor(YarnJobClusterExecutor)将StreamGraph转换为JobGraph
  5. ClusterDescriptor(YarnClusterDescriptor)通过YARN部署flink集群以执行JobGraph

JobManager进程什么时候、在哪、如何被拉起来执行?启动后做了什么?

入口

JobManager进程就是YARN job中的ApplicationMaster(AM)。

YARN NodeManager接收到YARN RM发送的AM container启动请求后为其设置好运行环境(环境变量、JAR包、配置文件、Cgroup资源限制等),将启动命令写到脚本文件中,运行脚本启动Container(JobManager进程)。

后续操作如下图所示:

流程图

Flink-Yarn-PerJob-2.jpg

步骤说明

1. 启动RpcService,内部创建了ActorSystem。

Flink集群内RPC通信是封装了Akka Actor来实现。

ActorSystem会创建一个Supervisor Actor,用来创建并启动其他的Actor,比如ResourceManager、Dispatcher、JobMaster。

2. 创建并启动WebMonitorEndpoint

这是一个借助netty实现的Rest服务,用来响应web请求。

ApplicationMaster启动后向YARN RM注册,注册的appTrackingUrl就是这个Web服务的地址,这样就可从YARN资源管理页面跳转到Flink Web UI页面。

3. 创建并启动ResourceManager。

  • 这里指的是Flink的ResourceManager,要与上述中YARN ResourceManager区别开。Flink的RM通过YarnResourceManagerDriver对象内的AMRMClientAsync沟通YARN RM,通过NMClientAsync沟通YARN NM。
  • 内部创建SlotManager,用来管理Slot资源。
  • ResourceManager是一个RPC服务,可以接收RPC请求。内部是通过步骤1中创建的RpcService来启动ResourceManager RPC服务,实际上是由RpcService中的Supervisor Actor创建的一个ResourceManager Actor来处理RPC请求。

4. 创建并启动Dispatcher。

Dispatcher是一个RPC服务,可以接收RPC请求。RPC服务的创建与服务过程与ResourceManager一样,不再多述。

5. Dispatcher创建并启动JobMaster。

  1. Dispatcher的onStart方法被调用,方法内部会启动recovered jobs(JobGraph),per job模式下,recovered job不为空,是借助FileJobGraphRetriever类从job.graph文件中读取而来。
  2. JobMaster内部有SchedulerNG和SlotPoolService对象。
  3. JobMaster创建完成后,onStart方法被调用,会触发SchedulerNG的调度,SchedulerNG向Flink ResourceManager申请slot资源,Flink RM收到请求向YARN RM申请启动container运行TaskManager进程。TaskManager进程启动后向Flink RM注册slot资源,JobMaster中的SchedulerNG就能从Flink RM申请获取到slot资源,然后向TaskManager提交运行Task。

调用链

YarnJobClusterEntrypoint.java
  main(String[] args)
  	ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint)
  		clusterEntrypoint.startCluster()
  			runCluster(Configuration, PluginManager)
  				initializeServices
						{
1.            commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService
              JMXService
              IO线程池
              HA Service 
              BlobServer
              HeartbeatServices
              MetricRegistry
              MetricQuery	RpcService
              ProcessMetricGroup
              ExecutionGraphInfoStore
						}
  				clusterComponent = dispatcherResourceManagerComponentFactory.create
            
DefaultDispatcherResourceManagerComponentFactory.java
	create(...)
  	{
2.   	webMonitorEndpoint = restEndpointFactory.createRestEndpoint
      webMonitorEndpoint.start()
        
3.		resourceManagerService = ResourceManagerServiceImpl.create    
  
4.    dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner
				{
  					...
            Dispatcher dispatcher = dispatcherFactory.createDispatcher
            dispatcher.start() 
            	{
              	...
5.               startRecoveredJobs()
                	{
                  	for (JobGraph recoveredJob : recoveredJobs){
                      ...
                      JobMaster jobMaster = new JobMaster(...) 
                      jobMaster.start()
                    }
                	}
            	}
				}
      resourceManagerService.start()
      	{
        	...
           YarnResourceManagerDriver.java
            initializeInternal()
          		{
            		 ...
                 // 创建并启动AMRMClientAsync,联系YARN RM
              	 resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient
                 resourceManagerClient.init(yarnConfig)
                 resourceManagerClient.start()
                 
                 // AM启动后向YARN RM注册自己,这样可以通过YARN跳转到Flink web ui页面
                 resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, webInterfaceUrl)               
                ...
                 // 创建并启动NMClientAsync,用于联系YARN NM
                 nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient
                 nodeManagerClient.init(yarnConfig)
                 nodeManagerClient.start() 
          		}
           	YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
          		{
							 	...
                 // 运行任务所需的Container都申请并领取完毕后,AM维护与YARN RM心跳
                 // resourceManagerClient = AMRMClientAsync
                 if (getNumRequestedNotAllocatedWorkers() <= 0) {
                 		resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
                 }
          		}
      	}  
  	}
  

总结

  1. Flink JobManager进程就是YARN job中的AM。
  2. Flink Client通过YarnClusterDescriptor中的YarnClient向YARN RM提交应用,YARN RM通过调度策略为其分配AM container资源,并通知container指定的YARN NM启动container。
  3. YARN NM接收到YARN RM发送的AM container启动请求后为其设置好运行环境(环境变量、JAR包、配置文件等),将启动命令写到脚本文件中,运行脚本启动Container(JobManager进程)。
  4. JobManager进程启动后运行WebMonitorEndpoint,并向YARN RM注册自己,注册的appTrackingUrl就是WebMonitorEndpoint中netty服务占用的地址和端口,即Flink web ui。
  5. 启动ResourceManager、Dispatcher等服务,并通过Dispatcher启动JobMaster来执行JobGraph。

TaskManager进程什么时候、在哪、如何被拉起来执行?

流程图

Flink-Yarn-PerJob-3.jpg

步骤说明

1. JobMaster通过SchedulerNG执行JobGraph的调度。

调度分两步走:

1) 获取资源(Task运行的容器,即Flink中的Slot,Slot需要从Container中划分)

2) 调度任务(Task)

注:这里仅说第1步,即TaskManager进程如何被调度起来为Task运行提供slots资源支持。第2步在下个问题描述。

2. SlotPoolService负责slots资源申请,先从缓存中(内存)检查是否有可用的slots资源,有的话直接分配,没的话会向Flink RM发送RPC请求。

AllocatedSlotPool中缓存有已经注册的slots资源:Map<AllocationID, AllocatedSlot> registeredSlots。

AllocatedSlot属性:AllocationID、TaskManagerLocation、TaskManagerGateway、ResourceProfile、physicalSlotNumber。

3. Flink RM接收到JobMaster发送的RPC资源请求,会将处理交SlotManager,SlotManager又通过ResourceManagerDriver来做具体的资源申请。

YarnResourceManagerDriver内部有YARN RM Client和YARN NM Client。

1) 通过YARN RM Client请求YARN RM分配containers。

2) 通过YARN NM Client向YARN NM发送请求,创建container运行TaskManager进程(指定了TaskManager进程入口类为YarnTaskExecutorRunner)。

调用链

JobMaster.java
	new JobMaster	
  	{
    	...
      // jobmanager.scheduler 默认值为Ng,因此创建的SchedulerNG为DefaultScheduler
      this.schedulerNG = createScheduler(...)  
      	{
        	// JobGraph -> ExecutionGraph
          this.executionGraph = createAndRestoreExecutionGraph(...)  
        }
    }
	jobMaster.start 
  ...
  onStart
  	startJobExecution
    	startJobMasterServices
      startScheduling
1.    	schedulerNG.startScheduling
    
DefaultScheduler.java
	// 方法体在父类SchedulerBase.java中  
  startScheduling		
  	startSchedulingInternal
    	schedulingStrategy.startScheduling
    
PipelinedRegionSchedulingStrategy.java
	startScheduling
  	maybeScheduleRegions(Set<SchedulingPipelinedRegion> regions) 
    	// 遍历regions,按region调度
      maybeScheduleRegion
      	schedulerOperations.allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption>)
    
DefaultScheduler.java  
	allocateSlotsAndDeploy
  	allocateSlots
    	executionSlotAllocator.allocateSlotsFor(List<ExecutionVertexID> executionVertexIds)

SlotSharingExecutionSlotAllocator.java
	allocateSlotsFor
  	getOrAllocateSharedSlot
    	slotProvider.allocatePhysicalSlot(PhysicalSlotRequest)

PhysicalSlotProviderImpl.java
	allocatePhysicalSlot
  	// 先看有可用的slot没,有的话直接分配
    tryAllocateFromAvailable
    // 没的话请求Flink RM获取
    orElseGet(requestNewSlot(...))
2.   	slotPool.requestNewAllocatedSlot(SlotRequestId, ResourceProfile, timeout)
    
DeclarativeSlotPoolBridge.java
	requestNewAllocatedSlot
  	internalRequestNewSlot
    	internalRequestNewAllocatedSlot
      	getDeclarativeSlotPool().increaseResourceRequirementsBy
    
DefaultDeclarativeSlotPool.java
	increaseResourceRequirementsBy
  	declareResourceRequirements
    	notifyNewResourceRequirements.accept(Collection<ResourceRequirement>)
    
DeclarativeSlotPoolService.java  
	declareResourceRequirements(Collection<ResourceRequirement>)
  	resourceRequirementServiceConnectionManager.declareResourceRequirements
    
DefaultDeclareResourceRequirementServiceConnectionManager.java
	declareResourceRequirements(ResourceRequirements)
  	triggerResourceRequirementsSubmission
    	sendResourceRequirements
      	service.declareResourceRequirements
    
DeclarativeSlotPoolService.java
	// 向Flink RM发送RPC请求,获取slots资源
  resourceManagerGateway.declareRequiredResources(JobMasterId, ResourceRequirements, Time timeout)
 

// Flink ResourceManager服务端处理资源请求
ResourceManager.java    
	declareRequiredResources   
3.	slotManager.processResourceRequirements

DeclarativeSlotManager.java 
	processResourceRequirements
  	resourceTracker.notifyResourceRequirements
    checkResourceRequirements()
    	tryFulfillRequirementsWithPendingSlots(JobID jobId, Collection<Map.Entry<ResourceProfile, Integer>> missingResources, ResourceCounter pendingSlots)
    		// 遍历missingResource
    		tryAllocateWorkerAndReserveSlot(ResourceProfile profile, ResourceCounter pendingSlots)
    			taskExecutorManager.allocateWorker(profile)
    
TaskExecutorManager.java
	allocateWorker
  	resourceActions.allocateResource(WorkerResourceSpec)

ResourceManager.java
	ResourceActionsImpl.allocateResource
  	startNewWorker

ActiveResourceManager.java
	startNewWorker
  	requestNewWorker        
    	resourceManagerDriver.requestResource(TaskExecutorProcessSpec)

YarnResourceManagerDriver.java
	requestResource
  		// 请求获取container资源
3.1 	resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority))

YarnResourceManagerDriver.java
	// container里有NM的地址信息
  YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
  	onContainersOfPriorityAllocated
    	// 遍历containers
      startTaskExecutorInContainerAsync
      	// 创建ContainerLaunchContext请求对象
        context = createTaskExecutorLaunchContext(ResourceID containerId, String host, TaskExecutorProcessSpec taskExecutorProcessSpec)
        // 通过YARN NM Client发送请求,启动container运行TaskManager进程
3.2     nodeManagerClient.startContainerAsync(container, context) 

总结

  1. YARN per job模式下,TaskManager进程不是根据配置事先就启动好的,而是需要有JobGraph的驱动。
  2. JobGraph被转为ExecutionGraph,后被进一步分解为一个个Task(可运行的Runnable对象),Task是需要在划定的slot资源里执行的,slot由TaskManager进程提供。
  3. JobMaster通过SlotPoolService向Flink RM申请获取资源,Flink RM通过SlotManager管理slot的申请与释放,SlotManager又通过ResourceManagerDriver来做具体的资源申请。YARN per job模式中是YarnResourceManagerDriver实现类,driver先向YARN RM申请分配container资源,然后driver联系container指定的YARN NM启动container,即运行TaskManager进程。

用户程序的Task什么时候、如何被分发到各个TaskManager进程中执行?

流程图

Flink-Yarn-PerJob-4.jpg

说明

  1. JobMaster中的SchedulerNG拿到slots资源后,开始进行Task的调度。
  2. Execution是可调度的最小单位,内有LogicalSlot,即这个Execution要被调度到哪个Slot中,通过LogicalSlot可获取其对应的TaskManager RPC客户端代理对象。
  3. 这样一个个Execution被转化为对应的TaskDeploymentDescriptor对象,通过RPC协议提交给对应的TaskManager执行。
  4. TaskManager接收到submitTask请求后将TaskDeploymentDescriptor转化为Task对象,将其放到对应的TaskSlot中,启动Thread执行Task。

提示:Task运行过程中,接收上游发过来的数据,处理完发往下游,由下游Task继续处理,这期间数据的存取由TaskSlot中的MemoryManager控制,相较于Java的堆来说能有效控制内存使用限额,缩减数据占用内存的大小,及时回收内存,这就是Flink的内存管理。

调用链

DefaultScheduler.java
  allocateSlotsAndDeploy
  	// 请求获取slots资源
  	allocateSlots
  	// 部署Task
  	waitForAllSlotsAndDeploy
  		deployAll(List<DeploymentHandle> deploymentHandles)
  			// 遍历deploymentHandles
  			deployOrHandleError(DeploymentHandle)
  				deployTaskSafe
  					executionVertexOperations.deploy(ExecutionVertex)
  
DefaultExecutionVertexOperations.java
  deploy(ExecutionVertex executionVertex)
  	executionVertex.deploy()
  
ExecutionVertex.java
  deploy()
  	currentExecution.deploy()

Execution.java
  deploy()
  	TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway()
  	taskManagerGateway.submitTask(TaskDeploymentDescriptor, rpcTimeout)
  
RpcTaskManagerGateway.java
  submitTask
  	taskExecutorGateway.submitTask(TaskDeploymentDescriptor, jobMasterId, timeout)
  
// RPC服务端响应  
TaskExecutor.java
  submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout)
		{
  		...
      // Task implements Runnable
  		Task task = new Task(...)
      taskSlotTable.addTask(task)
      task.startTaskThread()
        // executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask)
        executingThread.start()
		}

Task.java
	run()
    doRun()
			{
  			...
         TaskInvokable invokable = loadAndInstantiateInvokable(...)
         	{
						...
             invokableClass = Class.forName(className, true, classLoader).asSubclass(TaskInvokable.class)
             invokableClass.getConstructor(Environment.class).newInstance(environment)
        	}
  				restoreAndInvoke(invokable)
          ...
			}

附录

ResourceService创建与启动

ResourceManager创建与启动过程涉及Leader选举、代理类、Akka Actor,代码跳转比较绕,故这里把完整调用链描述一下,有兴趣可阅读。

整个过程总结下来就是

  1. 使用ResourceManagerService封装ResourceManager,ResourceManagerService启动后先做leader选举,成为leader后再创建并启动ResourceManager。
  2. 创建ResourceManager对象时,在其内部创建并启动了Akka Actor来做RPC服务。
  3. ResourceManager对象创建完毕调用start做初始化工作,启动相关服务。

start() -> onStart()

1) ResourceManager将start的处理交由代理对象RpcServer(AkkaInvocationHandler实例)的start方法处理

2) RpcServer invoke方法被调用,发现是非RPC消息,就调用自身start方法

3) start通过ResourceManager Actor的引用ActorRef向ResourceManager Actor发送start类型控制消息

4) Actor(AkkaRpcActor)收到消息,将处理交由RpcEndpoint,即ResourceManager处理

onStart() 方法内部启动相关服务。

调用链用如下

// 1) 启动leader选举服务,选举leader
// start(LeaderContender) -> leaderContender.grantLeadership
{
	// implements ResourceManagerService, LeaderContender
  ResourceManagerServiceImpl.java
	  start()
  		leaderElectionService.start(this)
    
	// HA leader选举  
  DefaultLeaderElectionService.java
  	start(LeaderContender)
    	leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver

  ZooKeeperLeaderElectionDriverFactory.java
  	createLeaderElectionDriver
    	new ZooKeeperLeaderElectionDriver
        	{
          	// latchPath节点用于leader选举
          	leaderLatch = new LeaderLatch(client, checkNotNull(latchPath))
            leaderLatch.addListener(this)
            leaderLatch.start()

            // leaderPath节点用于存储leader信息,监听该节点数据的变化
            cache = new NodeCache(client, leaderPath)
            cache.getListenable().addListener(this)
            cache.start()
          }

	ZooKeeperLeaderElectionDriver.java
  	// 成为leader时,latchPath上挂的监听会被回调,isLeader方法被执行
    isLeader()
    	leaderElectionEventHandler.onGrantLeadership

	DefaultLeaderElectionService.java
  	onGrantLeadership
    	leaderContender.grantLeadership   
}

// 2) 成为leader后,创建ResourceManager并启动RPC服务
{
  ResourceManagerServiceImpl.java
    grantLeadership
      startNewLeaderResourceManager
    		// resourceManagerFactory = ActiveResourceManagerFactory
				this.leaderResourceManager = resourceManagerFactory.createResourceManager 
  				{
            new ActiveResourceManager
              {
                ...
                // 启动RPC服务
                this.rpcServer = rpcService.startServer(this)
              }
  				}
    		startResourceManagerIfIsLeader
    			resourceManager.start()
}

// 3) start() -> onStart()
{
	// ResourceManager extends RpcEndpoint
  RpcEndpoint.java
  	start()
    	rpcServer.start()   

  // AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
  AkkaInvocationHandler.java(客户端)
  	// rpcServer是JDK Proxy生成的代理对象,实现了InvocationHandler接口的invoke方法,故start方法交由代理对象的invoke执行
    invoke(Object proxy, Method method, Object[] args) 
    	// if(非rpc方法) 调用对象自身相应的method处理
      result = method.invoke(this, args)
      	start()
        	// rpcEndpoint是Actor的引用:ActorRef,可以用来向Actor发消息,这里是向自身ResourceManager发送控制类消息
          rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender())

	// AkkaRpcActor extends AbstractActor,是一个Actor  
  AkkaRpcActor.java(服务端)  
  	handleControlMessage(ControlMessages)
    	// 初始状态为STOPPED,即state = AkkaRpcActor.StoppedState
      state = state.start(this)
      	akkaRpcActor.rpcEndpoint.internalCallOnStart()

  RpcEndpoint.java
  	internalCallOnStart()
    	onStart()  
}

// 4) 启动ResourceManager下相关服务:心跳管理服务、SlotManager等
{
	ResourceManager.java
    onStart()
    	startResourceManagerServices()
    		...
    		startHeartbeatServices()
    		slotManager.start
}

Dispatcher创建与启动

// 入口 dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner

// 1) 创建DispatcherRunner,启动HA leader选举
{
  DefaultDispatcherRunnerFactory.java
    createDispatcherRunner
      DefaultDispatcherRunner.create
        DispatcherRunnerLeaderElectionLifecycleManager.createFor
          new DispatcherRunnerLeaderElectionLifecycleManager
    				// 启动leader选举服务, 
    				// 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
            leaderElectionService.start(dispatcherRunner)  
}

// 2) 成为leader后,启动Dispatcher(内部启动了RPC服务)
{
	DefaultDispatcherRunner.java
  	grantLeadership
    	startNewDispatcherLeaderProcess
    		dispatcherLeaderProcess = createNewDispatcherLeaderProcess  			
    		dispatcherLeaderProcess.start

	AbstractDispatcherLeaderProcess.java
  	start
    	startInternal
      	onStart

	JobDispatcherLeaderProcess.java
  	onStart
    	dispatcherGatewayServiceFactory.create

	DefaultDispatcherGatewayServiceFactory.java
  	create(DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter)
    	{
      	Dispatcher dispatcher = dispatcherFactory.createDispatcher
        	// JobDispatcherFactory.java
          createDispatcher
          	// MiniDispatcher extends Dispatcher
            new MiniDispatcher
            	{
              	// 启动RPC服务,内部会创建Actor,返回代理对象AkkaInvocationHandler
                this.rpcServer = rpcService.startServer(this)
              }
          dispatcher.start         
      }
}

// 3) 启动后做什么?startRecoveredJobs(start JobMasters)  
{
	Dispatcher.java
  	start()
    .. // start() -> onStart() 过程,同上述ResourceManager调用链一致,不再多述
    onStart()
    	startDispatcherServices
      	startRecoveredJobs
        	// 遍历recoveredJobs
          runRecoveredJob(recoveredJob)
          	runJob
            	createJobManagerRunner
              	JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner
                runner.start()

  // implements LeaderContender  
	JobMasterServiceLeadershipRunner.java  
  	start
    	leaderElectionService.start(this)
    	... // 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
      grantLeadership
      	startJobMasterServiceProcessAsync
        	verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
          	createNewJobMasterServiceProcess
            	jobMasterServiceProcess = jobMasterServiceProcessFactory.create
    
	DefaultJobMasterServiceProcessFactory.java
  	create
    	new DefaultJobMasterServiceProcess
      	this.jobMasterServiceFuture = jobMasterServiceFactory.createJobMasterService
    
	DefaultJobMasterServiceFactory.java
  	createJobMasterService
    	internalCreateJobMasterService
      	JobMaster jobMaster = new JobMaster(...)
        jobMaster.start()

}

参考

  1. Flink on YARN(上):一张图轻松掌握基础架构与启动流程
  2. Apache Flink 进阶(四):Flink on Yarn/K8s 原理剖析及实践

版权声明:
作者:楞头青
链接:https://jkboy.com/archives/14695.html
来源:随风的博客
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
海报
Flink源码分析之Flink on YARN – Per Job
用户的应用程序是一个分布式程序,需要按照YARN的规范来写才能提交到YARN集群被调度运行起来。
<<上一篇
下一篇>>