YARN ResourceManager页面显示的使用率的问题
经过查看org.apache.hadoop.yarn.server.resourcemanager.webapp.FairSchedulerPage
FairSchedulerPage
public void render(Block html) {
Collectionfsqinfo.qinfo = info;
if (info instanceof FairSchedulerLeafQueueInfo) {
li.ul("#lq").li()._(LeafQueueBlock.class)._()._();
} else {
li._(QueueBlock.class);
}
li._();
}ul._();
}
}可以看到used的定义:
float used = info.getUsedMemoryFraction();
public float getUsedMemoryFraction() {
return fractionMemUsed;
}
fractionMemUsed = (float)usedResources.getMemory() /
clusterResources.getMemory();
页面上显示的使用率其实只反映了memory的使用率和vcores无关。
2. YARN vcores是如何实现分配的
先介绍YARN的基本工作流程:
步骤1:用户将应用程序提交到ResourceManager上;
步骤2:ResourceManager为应用程序ApplicationMaster申请资源,并与某个NodeManager通信,以启动ApplicationMaster;
步骤3:ApplicationMaster与ResourceManager通信,为内部要执行的任务申请资源,一旦得到资源后,将与NodeManager通信,以启动对应的任务。
步骤4:所有任务运行完成后,ApplicationMaster向ResourceManager注销,整个应用程序运行结束。
上述步骤中,步骤2-3涉及到资源申请与使用,而这正是Container出现的地方。
(1) 分配container
NodeManager的一个最重要的功能是根据ApplicationMaster的要求启动container, 由于各个节点上的container由ResourceManager进行统一管理和分配的,通常ResourceManager将Container
分配给ApplicationMaster, ApplicationMaster再进一步要求对应的NodeManager启动container。为防止ApplicationMaster未经授权随意要求NodeManager启动container, ResourceManager一般会为
每个container分配一个令牌。 而NodeManager启动任何container之前均会对令牌的合法性进行验证,一旦通过验证后,NodeManager才会按照一定的流程启动该container。
查看Log可以看到ResourceManager给ApplicationMaster分配一个container:
2015-01-05 14:37:32,076 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=bipub OPERATION=AM Allocated Container TARGET=SchedulerApp
RESULT=SUCCESS APPID=application_1412850382932_1898199 CONTAINERID=container_1412850382932_1898199_01_000295
2015-01-05 14:37:32,076 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode: Assigned container container_1412850382932_1898199_01_000295 of capacity
allocateContainer
/**
* The Scheduler has allocated containers on this node to the
* given application.
*
* @param applicationId application
* @param rmContainer allocated container
*/
public synchronized void allocateContainer(ApplicationId applicationId,
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " +
getAvailableResource() + " available");
}
(2) 启动container
启动Container是由ContainersLauncher完成的,该过程主要工作时将运行container对应的完整shell命令写到私有目录下的launch_container.sh中,并将token文件写到container_tokens中。
之所以要将container运行命令写到launch_container.sh中,然后通过运行shell脚本的形式运行container,主要是因为直接执行命令可能会有些特殊符号不识别。
(3) 运行container
而运行container是由插拔式组件ContainerExecutor完成的,YARN提供了两种ContainerExecutor实现,一种是DefaultContainerExecutor, 另一种是LinuxContainerExecutor。
DefaultContainerExecutor只是简单的以管理员身份运行launch_container.sh脚本,而LinuxContainerExecutor则是以container所属用户身份运行该脚本,它是Hadoop引入安全机制后加入的, hadoop 2.3.0 版本后container
引入cgroups隔离cpu资源,相关的代码也放在LinuxContainerExecutor中。
(4) 资源回收
资源回收由ResourceLocalizationService服务完成的,该过程与资源本地化正好相反,他负责撤销container运行过程中使用的资源。
map/reduce task执行的时候都会去resource里拿资源。
public TaskAttemptImpl(TaskId taskId, int i,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, String[] dataLocalHosts,
Token
// Initialize reportedStatus
reportedStatus = new TaskAttemptStatus();
initTaskAttemptStatus(reportedStatus);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.credentials = credentials;
this.jobToken = jobToken;
this.eventHandler = eventHandler;
this.jobFile = jobFile;
this.partition = partition;
//TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
this.resourceCapability.setMemory(
getMemoryRequired(conf, taskId.getTaskType()));
this.resourceCapability.setVirtualCores(
getCpuRequired(conf, taskId.getTaskType()));
this.dataLocalHosts = dataLocalHosts;
RackResolver.init(conf);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
}
如果配置文件了里定义了mapreduce.map.cpu.vcores, 那么一个map task就分配相应的CPU, 如果没有那么就使用默认的1。reduce也是一样。
public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
public static final int DEFAULT_MAP_CPU_VCORES = 1;
public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
Resource
private int getCpuRequired(Configuration conf, TaskType taskType) {
int vcores = 1;
if (taskType == TaskType.MAP) {
vcores =
conf.getInt(MRJobConfig.MAP_CPU_VCORES,
MRJobConfig.DEFAULT_MAP_CPU_VCORES);
} else if (taskType == TaskType.REDUCE) {
vcores =
conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
}
return vcores;
}