365health-365体育投注网站官网-365bet官网投注

NodeManager详细组件及功能

NodeManager详细组件及功能

NodeManager

NodeManager是Yarn中单节点的代理,它管理Hadoop集群中单个计算节点,其需要与应用程序的ApplicationMaster和集群资源管理器RM交互,从ApplicationMaster上接收到相关Container的执行命令(启动,停止Container);并向RM汇报各个Container的运行状态和节点健康状态,并领取相关的Container的执行命令;其主要的功能包括与RM保持通信,管理Container的生命周期,监控每个Container的资源使用情况,追踪节点健康状况,管理日志以及不同应用程序用到的附属服务;

NodeManager基本功能与其协议接口

NodeManager通过两个RPC协议与RM和各个ApplicationMaster进行通信:

ResourceTrackerProtocol协议:NodeManager通过该RPC协议向RM注册,汇报节点的健康状态以及Container的运行状态,并领取RM下发的命令例如重新初始化Container,清理Container占用的资源等;在这个协议中NodeManager主动向RM发送请求,RM响应其NodeManager的请求;ResourceTrackerProtocol协议主要提供了以下两个RPC函数:

registerNodeManager:NodeManager启动时通过该RPC函数向ResourceManager注册,注册信息由RegisterNodeManagerRequest封装的,包括如下三部分内容:

httpPort:该NodeManager对外提供的HTTP端口号,ResourceManager会在界面上提供一个可直接访问NodeManager Web界面的超链接nodeId:该NodeManager所在的host和对外的RPC端口号totalResource:该NodeManager所在节点总的可分配资源,当前支持内存和虚拟CPU两种资源,管理员可通过参数yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb配置

ResourceManager将通过registerNodeManager函数向NodeManager返回一个RegisterNodeManagerResponse类型的对象,主要包含以下信息 :

MasterKey:新生成的Container Token和Node Token的Master KeyNodeAction:ResourceManager向该NodeManager返回的下一步操,主要包括NORMAL、RESYNC、SHUTDOWN三种,分别表示正常,重新同步信息和停止运行rmIdentifier:ResourceManager的标示符,NodeManager通过该标识符判断ApplicationMaster发送的Container来自原始的还是新启动的ResourceManagerdiagnosticsMessage:NodeManager注册失败时,将收到一段诊断信息,告知具体的失败原因nodeHeartbeat:NodeManager启动后,定期通过该RPC函数向ResourceManager汇报Container运行信息和节点健康状况,并领取新的命令,比如杀死一个ContainerContainerManagementProtocol协议:应用程序的ApplicationMaster通过该RPC协议向NodeManager发起针对Container的相关操作,包括启动Container,杀死Container和获取Container;ContainerManagementProtocol协议主要提供了以下三个RPC函数:

startContainer:ApplicationMaster通过该RPC要求NodeManager启动一个Container。该函数有一个StartContainerRequest类型的参数,封装了Container启动所需的本地资源、环境变量、执行命令、Token等信息。如果Container启动成功,则该函数返回一个StartContainerResponse对象stopContainer:ApplicationMaster通过该RPC要求NodeManager停止(杀死)一个Container。该函数有一个StopContainerRequest类型的参数,用于指定待杀死的Container ID,如果Container被成功杀死,则该函数返回一个StopContainerResponse对象getContainerStatus:ApplicationMaster通过该RPC获取一个Container的运行状态。该函数参数类型为GetContainerStatusRequest,封装了目标Container的ID,返回值为封装了Container当前运行状态的类型为GetContainerStatusResponse的对象接下来从源码的角度介绍一下NodeManager上启动一个Container的流程如下,主要就是container对应的状态机的转化流程:

public StartContainersResponse startContainers(StartContainersRequest requests) throws YarnException, IOException {

// 权限验证 ......

List succeededContainers = new ArrayList();

Map failedContainers =

new HashMap();

for (StartContainerRequest request : requests.getStartContainerRequests()) {

ContainerId containerId = null;

try {

// container Token获取验证 ......

// 启动container的具体流程函数

startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, request);

succeededContainers.add(containerId);

} catch (YarnException e) {

// ......

}

}

return StartContainersResponse.newInstance(getAuxServiceMetaData(),

succeededContainers, failedContainers);

}

private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,

ContainerTokenIdentifier containerTokenIdentifier,

StartContainerRequest request) throws YarnException, IOException {

// 权限token验证

ContainerId containerId = containerTokenIdentifier.getContainerID();

String containerIdStr = containerId.toString();

String user = containerTokenIdentifier.getApplicationSubmitter();

LOG.info("Start request for " + containerIdStr + " by user " + user);

// 从请求的参数中获取启动container的启动上下文信息,包括cmd、env、jar等等信息

ContainerLaunchContext launchContext = request.getContainerLaunchContext();

// 构造ContainerImpl状态机来维护一个Container的运行生命周期

Container container =

new ContainerImpl(getConfig(), this.dispatcher,

launchContext, credentials, metrics, containerTokenIdentifier,

context);

try {

if (!serviceStopped) {

// Create the application

// 会尝试构造一个ApplicationImpl状态机来描述该节点上对同一个应用程序的所有container的管理

Application application =

new ApplicationImpl(dispatcher, user, applicationID, credentials, context);

if (null == context.getApplications().putIfAbsent(applicationID,

application)) {

// 如果这个应用第一次在该nodemanager上启动运行,则调度ApplicationInitEvent事件

// 来初始化ApplicationImpl状态机

dispatcher.getEventHandler().handle(

new ApplicationInitEvent(applicationID, appAcls,

logAggregationContext));

}

// 调度ApplicationContainerInitEvent事件来初始化该ContainerImpl状态机

this.context.getNMStateStore().storeContainer(containerId, request);

dispatcher.getEventHandler().handle(

new ApplicationContainerInitEvent(container));

// ......

}

} finally {

this.readLock.unlock();

}

}

在ApplicationImpl状态机处理ApplicationContainerInitEvent事件的时候,其会使用调度器调度ContainerInitEvent事件来触发对应container的初始化(该ContainerInitEvent事件的响应者为ContainerManagerImpl#ContainerEventDispatcher,其会调用对应的ContainerImpl.handler(event)来进行事件的处理),其主要初始化RequestResourcesTransition包括:下载container所对应的运行任务的资源jar包,env环境变量等等。在ContainerImpl本地化资源完成后,其会收到来自本地资源服务LocalizedResource发送的Resource_localized事件标识本地资源已经异步下载完全。之后ContainerImpl将会调度处理该事件(调用hook状态转移函数LocalizedTransition来触发当前container的LaunchEven操作);

static class LocalizedTransition implements

MultipleArcTransition {

@Override

public ContainerState transition(ContainerImpl container,

ContainerEvent event) {

// 判断本地资源的下载状态

ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;

List syms =

container.pendingResources.remove(rsrcEvent.getResource());

container.localizedResources.put(rsrcEvent.getLocation(), syms);

if (!container.pendingResources.isEmpty()) {

return ContainerState.LOCALIZING;

}

// 当前资源已经下载完毕

container.dispatcher.getEventHandler().handle(

new ContainerLocalizationEvent(LocalizationEventType.

CONTAINER_RESOURCES_LOCALIZED, container));

// 触发container的LaunchEvent事件来启动container

container.sendLaunchEvent();

container.metrics.endInitingContainer();

return ContainerState.LOCALIZED;

}

}

private void sendLaunchEvent() {

ContainersLauncherEventType launcherEvent =

ContainersLauncherEventType.LAUNCH_CONTAINER;

if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {

// try to recover a container that was previously launched

launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;

}

dispatcher.getEventHandler().handle(

new ContainersLauncherEvent(this, launcherEvent));

}

在ContainerManagerImpl中定义了处理ContainersLauncherEventType事件对于的事件处理器,如下:dispatcher.register(ContainersLauncherEventType.class, containersLauncher);可以知道在containerImpl向调度器发送了ContainersLauncherEvent事件之后,对应的ContainersLauncher对象实例会对该LAUNCH_CONTAINER事件进行对应的处理:

// ContainersLauncher

public void handle(ContainersLauncherEvent event) {

// TODO: ContainersLauncher launches containers one by one!!

Container container = event.getContainer();

ContainerId containerId = container.getContainerId();

switch (event.getType()) {

case LAUNCH_CONTAINER: // container启动

Application app =

context.getApplications().get(

containerId.getApplicationAttemptId().getApplicationId());

// 封装成ContainerLaunch对象,并在线程池中异步执行启动container的命令

ContainerLaunch launch =

new ContainerLaunch(context, getConfig(), dispatcher, exec, app,

event.getContainer(), dirsHandler, containerManager);

containerLauncher.submit(launch);

running.put(containerId, launch);

break;

case RECOVER_CONTAINER: // container恢复

app = context.getApplications().get(

containerId.getApplicationAttemptId().getApplicationId());

launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,

exec, app, event.getContainer(), dirsHandler, containerManager);

containerLauncher.submit(launch);

running.put(containerId, launch);

break;

case CLEANUP_CONTAINER: // container清理

ContainerLaunch launcher = running.remove(containerId);

if (launcher == null) {

// Container not launched. So nothing needs to be done.

return;

}

// Cleanup a container whether it is running/killed/completed, so that

// no sub-processes are alive.

try {

launcher.cleanupContainer();

} catch (IOException e) {

LOG.warn("Got exception while cleaning container " + containerId

+ ". Ignoring.");

}

break;

}

}

// ContainerLaunch

@SuppressWarnings("unchecked") // dispatcher not typed

public Integer call() {

final ContainerLaunchContext launchContext = container.getLaunchContext();

// 资源、本地文件tmp、logdir设置等等

// 命令cmd设置 及 本地可执行环境env变量设置

// 可执行环境classpath、jar路径设置等等

// LaunchContainer is a blocking call. We are here almost means the

// container is launched, so send out the event.

dispatcher.getEventHandler().handle(new ContainerEvent(

containerID,

ContainerEventType.CONTAINER_LAUNCHED));

context.getNMStateStore().storeContainerLaunched(containerID);

// 调用ContainerExecutor来进行container的具体启动操作

// 其有两种基本的实现:DefaultContainerExecutor和LinuxContainerExecutor

// 最终使用shell命令来启动对应container运行任务的进程。

exec.activateContainer(containerID, pidFilePath);

ret = exec.launchContainer(container, nmPrivateContainerScriptPath,

nmPrivateTokensPath, user, appIdStr, containerWorkDir,

localDirs, logDirs);

LOG.info("Container " + containerIdStr + " succeeded ");

dispatcher.getEventHandler().handle(

new ContainerEvent(containerID,

ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));

return 0;

}

其container运行启动过程如下所示:

NodeManager内部架构

NodeManager的内部架构如下图所示:

NodeStatusUpdater:这个组件是NodeManager与RM通信的唯一通道,包括NM注册之后向RM的注册汇报资源,以及周期性的汇报节点信息和Container的运行状态,同时RM会返回给待清理的Container列表,待清理的应用程序,诊断信息等;ContainerManager:该组件是NodeManager最核心的组件之一,它内部有很多子组件组成,如上图所示:

RPC Server:该协议实现了ContainerManagementProtocol协议,实现AM与NM之前的通信通道,通过该协议接受来自各个AM的启动或者杀死Container的命令;ResourceLocalizationService:负责Container所需资源的本地化,按照资源描述将资源从HDFS上下载所需资源,并将这些资源均摊给各个磁盘以防数据热点;ContainerLauncher:维护一个线程池以并行的方式完成Container的操作,比如来自AM的启动Container,来自AM或者RM的kill Container;AuxServices:NM允许用户添加附属服务;ContainersMonitor:监控Container资源的使用状况,周期性的检查Container资源使用情况,一旦有Container超过它允许使用资源的上限则kill掉;Loghandler:可插拔的组件,控制Container日志保存的方式;ContainerEventDispatcher:Container的事件调度器,负责将ContainerEvent类型的事件调度给对应Container的状态机ContainerImpl;ApplicationEventDispatcher:Application的事件调度器,负责将ApplicationEvent类型的事件调度给Application的状态机ApplicationImpl;ContainerExecutor:它可与底层的操作系统交互,安全存放Container的文件和目录,进而安全的启动和清理Container;NodeHealthCheckerService:周期性的运行一个向磁盘写文件的脚本,检测节点的健康状态,并汇报给RM;DeletionService:NodeManager删除文件组件;Security:安全模块;WebServer:web页面展示该节点上应用程序的状态;NodeManager的事件与事件处理器

NodeManager主要组件也是通过事件进行交互的,这使得组件能够异步并发完成各种功能;如下图所示:

相关推荐