MapReduce —— MapTask阶段源码分析(Output环节)

2021-06-11

Dream car 镇楼 ~ !

接上一节Input环节,接下来分析 output环节。代码在runNewMapper()方法中:

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,
  final TaskUmbilicalProtocol umbilical,TaskReporter reporter) {
                          .......
       // 这个out也被包含在map的上下文当中了,所以在map方法中的输出,调用的是output的write方法
      org.apache.hadoop.mapreduce.RecordWriter output = null;
       // 记住这个数值  0 
    if (job.getNumReduceTasks() == 0) {  // 判断ReduceTask的数量
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {    // > 0
        // 创建一个 Collector 对象  【看构造源码可以知道输出的时候是需要分区的】
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
                                 
//  -----------new NewOutputCollector() begin ------------------
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
        //1、 赋值操作。先不仔细看,跳过~  下一段说
      collector = createSortingCollector(job, reporter);
        
        // 2、有多少个reducetask 就有多少个分区
        // 回忆:一个分区可以有若干组,相同的key为一组
      partitions = jobContext.getNumReduceTasks();
        
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
            // 常见套路:反射生成实例对象,如果有自定义分区器,则不使用默认的
            // 默认的分区算法是简单的hash取模,会保证相同的key在一组
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
          
      } else {  // reducetask = 1,所有的组都会进入一个分区
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
           // 返回分区号,返回的值固定为 0
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }
//  -----------new NewOutputCollector()  end ------------------
   
                                 
//  -----------write(K key, V value) begin ------------------
     // output往外写的时候带着 (k v p)  三元组       
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
//  -----------write(K key, V value) end --------------------
        
                             ..............                          

  }

createSortingCollector(job, reporter)方法进去:

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    
    // 反射创建collector实例
    MapOutputCollector<KEY, VALUE> collector
      = (MapOutputCollector<KEY, VALUE>)
        // 常见套路:如果没有用户自定义collector,那么就取默认的
       ReflectionUtils.newInstance(
                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
                        // MapOutputBuffer 这玩意牛逼,后边再说。
                        MapOutputBuffer.class, MapOutputCollector.class), job);
    
    MapOutputCollector.Context context =
                           new MapOutputCollector.Context(this, job, reporter);
    
    // 初始化的就是 MapOutputBuffer,真正要使用它之前要初始化。
    // 重要方法,下段分析
    collector.init(context);
    return collector;
  }

重头戏了,进入初始化环节:collector.init(context) ,删除非核心代码,清清爽爽开开心心读源码 ~

    public void init(MapOutputCollector.Context context)  {
      // 0.随便看看
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      // 1.溢写的阈值 0.8 , 剩下的 0.2 空间还可以继续使用
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
        
        // 2.缓冲区的默认大小
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
        
        // 3. 排序器:如果没有自定义,就使用默认的快排算法
        // 排序的本质就是在做比较:字典序或者数值序,所以排序器要用到【比较器】后边会说
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
        
      //--------------------这可就是大名鼎鼎的环形缓冲区,真™牛X的设计---------------
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];
      bufvoid = kvbuffer.length;
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();
      setEquator(0);
      bufstart = bufend = bufindex = equator;
      kvstart = kvend = kvindex;

      maxRec = kvmeta.capacity() / NMETA;
      softLimit = (int)(kvbuffer.length * spillper);
      bufferRemaining = softLimit;
       //--------------------------------------------------------------------

      // k/v serialization
      // 4.获取【比较器】进行排序。如果没有自定义,就使用默认的。
      // key 类型都是Hadoop封装的可序列化类,自身都带比较器
      comparator = job.getOutputKeyComparator();
        .............

      // output counters
       .............

      // compression:数据压缩
         ............
            
      // combiner:相同的key在map端做一次合并,减少reduce拉取的数据量.为我们提供了调优接口
      // 俗称:小reduce ,会在map端发生一次或多次. 之后的文章会介绍这个源码
        .............
            
      // 4. 溢写线程 
      // 当环形缓冲区的占用到80%,将缓冲区中的数据写入到磁盘
      // 此时的缓冲区是多个线程共享的:有线程在往磁盘写,有线程在往缓冲区写
      // 怎样防止读写线程碰撞?答:反向写数据到缓冲区
      spillInProgress = false;
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
      } finally {
        spillLock.unlock();
      }
    }

后边源码也没必要一行行看了,直接文字总结描述了

MapOutBuffer:

map 输出的K-V会被序列化成字节数组,计算出分区号,最终是三元组<k,v,p>

buffer 是map过程使用到的环形缓冲区:

  • 本质是字节数组;
  • 赤道:两端分别存放K-V,索引;
  • 索引:对K-V的索引,固定长度16B,4个int:分区号P,K的偏移量,V的偏移量,V的数据长度;
  • 数据填充到缓冲区的阈值 80% 时,启动溢写线程;
  • 快速排序 80%的数据,同时Map输出的线程向缓冲区的剩余部分写入;
  • 快速排序的过程,比较的是key,但是移动的是索引;
  • 溢写时只要排序后的索引,溢出数据就是有序的;

注意:排序是二次排序:

  • 分区有序:reduce拉取数据是按照分区拉取;
  • 分区内key 有序:因为reduce计算是按照分组计算;

调优:在溢写过程中会发生combiner

  • 其实就是一个 map 里的reduce,按照组进行统计;
  • 发生时间点:排序之后相同的key放在一起了,开始combiner,然后溢写;
  • minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3),最终map结束输出过程buffer会溢出多个小文件,当文件的个数达到3个时,map会把小文件合并,避免文件的碎片化【小文件问题,后边还会提及】

附 溢写线程相关源码:

protected class SpillThread extends Thread {
      @Override
      public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (!spillInProgress) {
              spillReady.await();
            }
            try {
              spillLock.unlock();
                // 排序并溢写会被调用
              sortAndSpill();
            } catch (Throwable t) {
              sortSpillException = t;
            } finally {
              spillLock.lock();
              if (bufend < bufstart) {
                bufvoid = kvbuffer.length;
              }
              kvstart = kvend;
              bufstart = bufend;
              spillInProgress = false;
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }
    }

sortAndSpill()

private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      final long size = (bufend >= bufstart
          ? bufend - bufstart
          : (bufvoid - bufend) + bufstart) +
                  partitions * APPROX_HEADER_LENGTH;
      FSDataOutputStream out = null;
      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);

        final int mstart = kvend / NMETA;
        final int mend = 1 + // kvend is a valid record
          (kvstart >= kvend
          ? kvstart
          : kvmeta.capacity() + kvstart) / NMETA;
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
        int spindex = mstart;
        final IndexRecord rec = new IndexRecord();
        final InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
              // 会调用combiner
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                final int kvoff = offsetFor(spindex % maxRec);
                int keystart = kvmeta.get(kvoff + KEYSTART);
                int valstart = kvmeta.get(kvoff + VALSTART);
                key.reset(kvbuffer, keystart, valstart - keystart);
                getVBytesForOffset(kvoff, value);
                writer.append(key, value);
                ++spindex;
              }
            } else {
              int spstart = spindex;
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec)
                            + PARTITION) == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }