Flink Yarn Per Job - JobManger 申请 Slot

JobMaster 启动时,启动 SlotPool,向 ResourceManager 注册
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wXufj8G-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v0klERL9-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
启动SlotPool
JobMaster
private void startJobMasterServices() throws Exception {// 启动心跳服务:taskmanager、resourcemanagerstartHeartbeatServices();// start the slot pool make sure the slot pool now accepts messages for this leader// 启动 slotpoolslotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start// try to reconnect to previously known leader// 连接到之前已知的 ResourceManagerreconnectToResourceManager(new FlinkException("Starting JobMaster component."));// job is ready to go, try to establish connection with resource manager// - activate leader retrieval for the resource manager// - on notification of the leader, the connection will be established and// the slot pool will start requesting slots// 与ResourceManager建立连接,slotpool开始请求资源resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
-
启动心跳服务:taskmanager、resourcemanager
-
启动 slotpool
-
连接到之前已知的 ResourceManager
-
slotpool开始请求资源
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Aw0jORrK-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z1OedAhy-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
向RM注册
StandaloneLeaderRetrievalService
@Override
public void start(LeaderRetrievalListener listener) {checkNotNull(listener, "Listener must not be null.");synchronized (startStopLock) {checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");started = true;// directly notify the listener, because we already know the leading JobManager's addresslistener.notifyLeaderAddress(leaderAddress, leaderId);}
}
ResourceManagerLeaderListener in TaskExecutor
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {@Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() -> notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}
}private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
} private void reconnectToResourceManager(Exception cause) {closeResourceManagerConnection(cause);startRegistrationTimeout();tryConnectToResourceManager();
}private void tryConnectToResourceManager() {if (resourceManagerAddress != null) {connectToResourceManager();}
}private void connectToResourceManager() {
... ...resourceManagerConnection.start();
}
RegisteredRpcConnection
public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");// 创建注册对象final RetryingRegistration newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {// 开始注册,注册成功之后,调用 onRegistrationSuccess()newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}
}private RetryingRegistration createNewRegistration() {RetryingRegistration newRegistration = checkNotNull(generateRegistration());
... ...return newRegistration;
}
-
创建注册对象
-
开始注册,注册成功之后,调用 onRegistrationSuccess()
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DDQ64O0N-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xC5Swm4q-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
SlotPool 申请 slot
ResourceManagerConnection in JobMaster
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {runAsync(() -> {// filter out outdated connections//noinspection ObjectEqualityif (this == resourceManagerConnection) {establishResourceManagerConnection(success);}});
}private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {final ResourceManagerId resourceManagerId = success.getResourceManagerId();
... ...// slotpool连接到ResourceManager,请求资源slotPool.connectToResourceManager(resourceManagerGateway);
... ...
}
slotpool连接到ResourceManager,请求资源
SlotPoolImpl
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {this.resourceManagerGateway = checkNotNull(resourceManagerGateway);// work on all slots waiting for this connectionfor (PendingRequest pendingRequest : waitingForResourceManager.values()) {requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);}// all sent offwaitingForResourceManager.clear();
}private void requestSlotFromResourceManager(final ResourceManagerGateway resourceManagerGateway,final PendingRequest pendingRequest) {
... ...CompletableFuture rmResponse = resourceManagerGateway.requestSlot(jobMasterId,new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),rpcTimeout);
... ...
}
从RM中申请slot
ResourceManager
@Override
public CompletableFuture requestSlot(JobMasterId jobMasterId,SlotRequest slotRequest,final Time timeout) {JobID jobId = slotRequest.getJobId();JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);if (null != jobManagerRegistration) {if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {log.info("Request slot with profile {} for job {} with allocation id {}.",slotRequest.getResourceProfile(),slotRequest.getJobId(),slotRequest.getAllocationId());try {// RM内部的 slotManager去向 Yarn的ResourceManager申请资源slotManager.registerSlotRequest(slotRequest);} catch (ResourceManagerException e) {return FutureUtils.completedExceptionally(e);}return CompletableFuture.completedFuture(Acknowledge.get());} else {return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));}} else {return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));}
}
RM内部的 slotManager去向 Yarn的ResourceManager申请资源
SlotManagerImpl
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {checkInit();
... ... try {internalRequestSlot(pendingSlotRequest);} catch (ResourceManagerException e) {// requesting the slot failed --> remove pending slot requestpendingSlotRequests.remove(slotRequest.getAllocationId());throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);}return true;}
}private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();OptionalConsumer.of(findMatchingSlot(resourceProfile)).ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)).ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();Optional pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);if (!pendingTaskManagerSlotOptional.isPresent()) {pendingTaskManagerSlotOptional = allocateResource(resourceProfile);}
... ...
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s7gTM4KK-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dtgk4Mxi-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]
Flink内的RM向Yarn的RM申请资源
SlotManagerImpl
private Optional allocateResource(ResourceProfile requestedSlotResourceProfile) {
... ...if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) {// resource cannot be allocatedreturn Optional.empty();}
... ...
}
ResourceActionsImpl in ResourceManager
public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {validateRunsInMainThread();return startNewWorker(workerResourceSpec);
}
ActiveResourceManager
public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {requestNewWorker(workerResourceSpec);return true;
}// 从配置中获取 taskexecutor 配置
private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {final TaskExecutorProcessSpec taskExecutorProcessSpec =TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);final int pendingCount = pendingWorkerCounter.increaseAndGet(workerResourceSpec);log.info("Requesting new worker with resource spec {}, current pending count: {}.",workerResourceSpec,pendingCount);
// 申请资源CompletableFuture requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);FutureUtils.assertNoException(requestResourceFuture.handle((worker, exception) -> {if (exception != null) {final int count = pendingWorkerCounter.decreaseAndGet(workerResourceSpec);log.warn("Failed requesting worker with resource spec {}, current pending count: {}, exception: {}",workerResourceSpec,count,exception);requestWorkerIfRequired();} else {final ResourceID resourceId = worker.getResourceID();workerNodeMap.put(resourceId, worker);currentAttemptUnregisteredWorkers.put(resourceId, workerResourceSpec);log.info("Requested worker {} with resource spec {}.",resourceId.getStringWithMetadata(),workerResourceSpec);}return null;}));
}
-
从配置中获取 taskexecutor 配置
-
申请资源
YarnResourceManagerDriver
public CompletableFuture requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {checkInitialized();final CompletableFuture requestResourceFuture = new CompletableFuture<>();final Optional priorityAndResourceOpt =taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);if (!priorityAndResourceOpt.isPresent()) {requestResourceFuture.completeExceptionally(new ResourceManagerException(String.format("Could not compute the container Resource from the given TaskExecutorProcessSpec %s. " +"This usually indicates the requested resource is larger than Yarn's max container resource limit.",taskExecutorProcessSpec)));} else {final Priority priority = priorityAndResourceOpt.get().getPriority();final Resource resource = priorityAndResourceOpt.get().getResource();resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));// make sure we transmit the request fast and receive fast news of granted allocationsresourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new LinkedList<>()).add(requestResourceFuture);log.info("Requesting new TaskExecutor container with resource {}, priority {}.", taskExecutorProcessSpec, priority);}return requestResourceFuture;
}

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
