原创

FairScheduler vcores分配调研

  1. YARN ResourceManager页面显示的使用率的问题

    fairscheduler.png

        经过查看org.apache.hadoop.yarn.server.resourcemanager.webapp.FairSchedulerPage

    FairSchedulerPage

    public void render(Block html) {
          Collection

            fsqinfo.qinfo = info;
            if (info instanceof FairSchedulerLeafQueueInfo) {
              li.ul("#lq").li()._(LeafQueueBlock.class)._()._();
            } else {
              li._(QueueBlock.class);
            }
            li._();
          }

          ul._();
        }
      }

     可以看到used的定义:

        float used = info.getUsedMemoryFraction();
     
        public float getUsedMemoryFraction() {
              return fractionMemUsed;
       }
      
       fractionMemUsed = (float)usedResources.getMemory() /
            clusterResources.getMemory();


       页面上显示的使用率其实只反映了memory的使用率和vcores无关。

 2. YARN vcores是如何实现分配的

        先介绍YARN的基本工作流程:

        步骤1:用户将应用程序提交到ResourceManager上;

        步骤2:ResourceManager为应用程序ApplicationMaster申请资源,并与某个NodeManager通信,以启动ApplicationMaster;

        步骤3:ApplicationMaster与ResourceManager通信,为内部要执行的任务申请资源,一旦得到资源后,将与NodeManager通信,以启动对应的任务。

        步骤4:所有任务运行完成后,ApplicationMaster向ResourceManager注销,整个应用程序运行结束。

        上述步骤中,步骤2-3涉及到资源申请与使用,而这正是Container出现的地方。

       (1)  分配container 

              NodeManager的一个最重要的功能是根据ApplicationMaster的要求启动container, 由于各个节点上的container由ResourceManager进行统一管理和分配的,通常ResourceManager将Container

       分配给ApplicationMaster, ApplicationMaster再进一步要求对应的NodeManager启动container。为防止ApplicationMaster未经授权随意要求NodeManager启动container, ResourceManager一般会为

       每个container分配一个令牌。 而NodeManager启动任何container之前均会对令牌的合法性进行验证,一旦通过验证后,NodeManager才会按照一定的流程启动该container。

       查看Log可以看到ResourceManager给ApplicationMaster分配一个container:

       2015-01-05 14:37:32,076 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=bipub    OPERATION=AM Allocated Container        TARGET=SchedulerApp   

       RESULT=SUCCESS  APPID=application_1412850382932_1898199 CONTAINERID=container_1412850382932_1898199_01_000295

       2015-01-05 14:37:32,076 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode: Assigned container container_1412850382932_1898199_01_000295 of capacity

      

allocateContainer

/**
* The Scheduler has allocated containers on this node to the 
* given application.

* @param applicationId application
* @param rmContainer allocated container
*/
public synchronized void allocateContainer(ApplicationId applicationId, 
RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;

launchedContainers.put(container.getId(), rmContainer);

LOG.info("Assigned container " + container.getId() + 
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
", which currently has " + numContainers + " containers, " + 
getUsedResource() + " used and " + 
getAvailableResource() + " available");
}

  (2) 启动container 

          启动Container是由ContainersLauncher完成的,该过程主要工作时将运行container对应的完整shell命令写到私有目录下的launch_container.sh中,并将token文件写到container_tokens中。

之所以要将container运行命令写到launch_container.sh中,然后通过运行shell脚本的形式运行container,主要是因为直接执行命令可能会有些特殊符号不识别。

(3) 运行container

          而运行container是由插拔式组件ContainerExecutor完成的,YARN提供了两种ContainerExecutor实现,一种是DefaultContainerExecutor, 另一种是LinuxContainerExecutor。

DefaultContainerExecutor只是简单的以管理员身份运行launch_container.sh脚本,而LinuxContainerExecutor则是以container所属用户身份运行该脚本,它是Hadoop引入安全机制后加入的, hadoop 2.3.0 版本后container

引入cgroups隔离cpu资源,相关的代码也放在LinuxContainerExecutor中。

(4) 资源回收

           资源回收由ResourceLocalizationService服务完成的,该过程与资源本地化正好相反,他负责撤销container运行过程中使用的资源。


 map/reduce task执行的时候都会去resource里拿资源。

 public TaskAttemptImpl(TaskId taskId, int i, 
      EventHandler eventHandler,
      TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
      JobConf conf, String[] dataLocalHosts,
      Token

     // Initialize reportedStatus
    reportedStatus = new TaskAttemptStatus();
    initTaskAttemptStatus(reportedStatus);

    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    readLock = readWriteLock.readLock();
    writeLock = readWriteLock.writeLock();

    this.credentials = credentials;
    this.jobToken = jobToken;
    this.eventHandler = eventHandler;
    this.jobFile = jobFile;
    this.partition = partition;

    //TODO:create the resource reqt for this Task attempt
    this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
    this.resourceCapability.setMemory(
        getMemoryRequired(conf, taskId.getTaskType()));
    this.resourceCapability.setVirtualCores(
        getCpuRequired(conf, taskId.getTaskType()));
    this.dataLocalHosts = dataLocalHosts;
    RackResolver.init(conf);

    // This "this leak" is okay because the retained pointer is in an
    //  instance variable.
    stateMachine = stateMachineFactory.make(this);
  }

 

  如果配置文件了里定义了mapreduce.map.cpu.vcores, 那么一个map task就分配相应的CPU, 如果没有那么就使用默认的1。reduce也是一样。

  public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
  public static final int DEFAULT_MAP_CPU_VCORES = 1;

  public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
  public static final int DEFAULT_REDUCE_CPU_VCORES = 1;

Resource

 private int getCpuRequired(Configuration conf, TaskType taskType) {
    int vcores = 1;
    if (taskType == TaskType.MAP)  {
      vcores =
          conf.getInt(MRJobConfig.MAP_CPU_VCORES,
              MRJobConfig.DEFAULT_MAP_CPU_VCORES);
    } else if (taskType == TaskType.REDUCE) {
      vcores =
          conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
              MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
    }
    
    return vcores;
  }

 


关注下方微信公众号“Java精选”(w_z90110),回复关键字领取资料:如HadoopDubboCAS源码等等,免费领取资料视频和项目。 

涵盖:程序人生、搞笑视频、算法与数据结构、黑客技术与网络安全、前端开发、Java、Python、Redis缓存、Spring源码、各大主流框架、Web开发、大数据技术、Storm、Hadoop、MapReduce、Spark、elasticsearch、单点登录统一认证、分布式框架、集群、安卓开发、iOS开发、C/C++、.NET、Linux、Mysql、Oracle、NoSQL非关系型数据库、运维等。

评论

分享:

支付宝

微信