`
hongs_yang
  • 浏览: 59547 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

hadoop-mapreduce中maptask运行分析

阅读更多

MapTask运行通过执行.run方法:

 

1.生成TaskAttemptContextImpl实例,此实例中的Configuration就是job本身。

 

2.得到用户定义的Mapper实现类,也就是map函数的类。

 

3.得到InputFormat实现类。

 

4.得到当前task对应的InputSplit.

 

5.通过InputFormat,得到对应的RecordReader

 

6.生成RecordWriter实例,

 

如果reduce个数为0,生成为MapTask.NewDirectOutputCollector

 

如果reduce个数不为0,但肯定是一个大于0的数,生成MapTask.NewOutputCollector

 

如果是有reduce的情况,在collector中会生成一个buffercollector用来进行内存排序。

 

通过mapreduce.job.map.output.collector.class配置,默认为MapTask.MapOutputBuffer

 

MapOutputBuffer中:

 

通过mapreduce.map.sort.spill.percent配置内存flush的比值,默认为0.8

 

spill的中文意思是溢出。

 

通过mapreduce.task.io.sort.mb配置内存bufer的大小,默认是100mb

 

通过mapreduce.task.index.cache.limit.bytes配置(还不知道是做什么的),默认为1024*1024

 

提示,这个配置是用来cache进行spill操作的index的大小。当spillindex达到此值的时候,

 

需要写入spillindex的文件。

 

通过map.sort.class配置排序实现类,默认为QuickSort,快速排序

 

通过mapreduce.map.output.compress.codec配置map的输出的压缩处理程序。

 

通过mapreduce.map.output.compress配置map输出是否启用压缩。默认为false.

 

MapOutputBuffer实例生成部分结束。

 

 

 

在生成MapTask.NewOutputCollector同时,会

 

检查是否用户有定义的Partitioner,默认是HashPartitioner

 

如果生成的实例为MapTask.NewDirectOutputCollector,也就是没有Reduce的情况下,

 

不执行排序操作也不执行buffer的缓冲操作,直接写入到output的文件中。

 

通过OutputFormatRecordWriter

 

 

 

 

 

 

 

 

 

以下是mapper.run方法的执行代码:

 

publicvoid run(Context context) throws IOException, InterruptedException {

 

setup(context);

 

try {

 

while (context.nextKeyValue()) {

 

map(context.getCurrentKey(), context.getCurrentValue(), context);

 

}

 

} finally {

 

cleanup(context);

 

}

 

}

 

由上面的代码可以看出,map运行时,会执行一次setup函数,完成时会执行一次cleanup函数。

 

中间只要有值就会调用map函数。

 

其中run中传入的context生成由来:

 

if (job.getNumReduceTasks() == 0) {

 

output =

 

newNewDirectOutputCollector(taskContext, job, umbilical, reporter);

 

} else {

 

output = newNewOutputCollector(taskContext, job, umbilical, reporter);

 

}

 

MapContextImpl实例,包含input(RecordReader)output,也就是上面提到的collector.

 

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>

 

mapContext =

 

new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),

 

input, output,

 

committer,

 

reporter, split);

 

WrappedMapper.Context实例。包含MapContextImpl实例。

 

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context

 

mapperContext =

 

new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(

 

mapContext);

 

 

 

 

 

接着看mapper.run中的context.nextKeyValue()函数:

 

调用WrappedMapper.Context.nextKeyValue()函数,-->

 

调用MapContextImpl.nextKeyValue函数,-->

 

调用RecordReader.nextKeyValue函数,RecordReader不在说明。

 

 

 

map函数对过程处理完成后,会通过context.write写入分析的数据,

 

context.write(word, one);

 

 

 

看看此部分是如何执行的:

 

调用WrappedMapper.Context.write-->

 

调用MapContextImpl.write-->TaskInputOutputContextImpl.write-->

 

MapTask.NewOutputCollector.write/MapTask.NewDirectOutputCollector.write

 

 

 

MapTask.NewDirectOutputCollector.write:

 

这个里面没什么可以说的,直接写入到输出文件中。

 

NewDirectOutputCollector(MRJobConfig jobContext,

 

JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)

 

throws IOException, ClassNotFoundException, InterruptedException {

 

............................................

 

out = outputFormat.getRecordWriter(taskContext);

 

............................................

 

}

 

 

 

写入函数的定义

 

publicvoid write(K key, V value)

 

throws IOException, InterruptedException {

 

reporter.progress();

 

long bytesOutPrev = getOutputBytes(fsStats);

 

直接写入文件。

 

out.write(key, value);

 

long bytesOutCurr = getOutputBytes(fsStats);

 

fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);

 

mapOutputRecordCounter.increment(1);

 

}

 

 

 

 

 

重点来看看MapTask.NewOutputCollector.write这部分的实现:

 

通过Partitioner来生成reducepartition值,调用MapOutputBuffer.collect函数。

 

也就是写入到buffer中。

 

publicvoid write(K key, V value) throws IOException, InterruptedException {

 

collector.collect(key, value,

 

partitioner.getPartition(key, value, partitions));

 

}

 

 

 

 

 

MapOutputBuffer.collector:

 

 

 

public synchronized void collect(K key, V value, final int partition

 

) throws IOException {

 

reporter.progress();

 

检查传入的key的类型是否是jobMapOutputKeyClass的值

 

if (key.getClass() != keyClass) {

 

thrownew IOException("Type mismatch in key from map: expected "

 

+ keyClass.getName() + ", received "

 

+ key.getClass().getName());

 

}

 

检查传入的value的类型是否是jobMapOutputValueClass的值。

 

if (value.getClass() != valClass) {

 

thrownew IOException("Type mismatch in value from map: expected "

 

+ valClass.getName() + ", received "

 

+ value.getClass().getName());

 

}

 

检查partition是否在指定的范围内。

 

if (partition < 0 || partition >= partitions) {

 

thrownew IOException("Illegal partition for " + key + " (" +

 

partition + ")");

 

}

 

检查sortSpillException的值是否为空,如果不为空,表示有spill错误,throw ioexception

 

checkSpillException();

 

把可写入的buffer的剩余部分减去一个固定的值,并检查可用的buffer是否达到了sortspill的值

 

默认是buffer0.8的大小,如果buffer0.8METASIZE取于不等于0时,

 

得到的值可能会比0.8METASIZE这么一点。

 

bufferRemaining -= METASIZE;

 

if (bufferRemaining <= 0) {

 

执行spill操作,这部分等下再进行分析

 

// start spill if the thread is not running and the soft limit has been

 

// reached

 

spillLock.lock();

 

try {

 

......................此部分代码先不看

 

} finally {

 

spillLock.unlock();

 

}

 

}

 

 

 

try {

 

第一次进入时,bufindex的值为0,以后的每一次是key.len+1+value.len+1的值增加。

 

// serialize key bytes into buffer

 

int keystart = bufindex;

 

key写入到此实例中的一个BlockingBuffer类型的属性bb中。这是一个buffer.

 

在写入时把bufferRemaining的值减去key.length的长度。这里面也会检查buffer是否够用

 

key写入到kvbuffer中,同时把bufindex的值加上key.lengthKvbuffer就是具体的buffer.

 

在执行写入key/value时,首先是先把bufferRemaining的值减去key.length/value.length的长度。

 

同时检查此时bufferRemaining的值是否会小于或等于0,如果是需要先做spill操作。

 

否则把数据写入kvbuffer中,并把bufindex的值加上key.length/value.length

 

具体的写入操作请查看MapTask.Buffer中的write函数。

 

keySerializer.serialize(key);

 

这个地方有可能会出现,为什么呢,因为buffer是不停在重复使用,当使用到后面时,

 

前面可能会已经执行了spill操作。因此到bufindex达到最后的时候,会回到开始位置接着写。

 

if (bufindex < keystart) {

 

// wrapped the key; must make contiguous

 

bb.shiftBufferedKey();

 

keystart = 0;

 

}

 

此时的valstart的值为key结束后的下一个下标值。按key同样的方式写入value

 

// serialize value bytes into buffer

 

finalint valstart = bufindex;

 

valSerializer.serialize(value);

 

下面这一行是一个长度为0byte array,不做操作。

 

// It's possible for records to have zero length, i.e. the serializer

 

// will perform no writes. To ensure that the boundary conditions are

 

// checked and that the kvindex invariant is maintained, perform a

 

// zero-length write into the buffer. The logic monitoring this could be

 

// moved into collect, but this is cleaner and inexpensive. For now, it

 

// is acceptable.

 

bb.write(b0, 0, 0);

 

通过bufmark属性标记下bufindex的值。并返回bufindex的值。此时bufindex的值是val结束的下标。

 

// the record must be marked after the preceding write, as the metadata

 

// for this record are not yet written

 

int valend = bb.markRecord();

 

 

 

mapOutputRecordCounter.increment(1);

 

mapOutputByteCounter.increment(

 

distanceTo(keystart, valend, bufvoid));

 

记录kvmeta信息,此处是一个IntBuffer的缓冲区,每次向kvmeta中写入4个下标的值,

 

第一次时,kvindex0,第二次是kvindex的值为kvmeta.capacity()-4的值。

 

也就是说第一次是从前面开始写,从第二次开始都是从后面向前面开始写。

 

partition的值写入到meta的第2个下标,把keystart写入到第一个下标,

 

valstart的值写入到meta的第0个下标,把value的长度写入到第三个下标。

 

Kvmetabuffer是如下图例的样子

 

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

VALSTART(0)

KEYSTART(1)

PARTITION(2)

VALLEN(3)

 

 

 

 

VALSTART(0)

KEYSTART(1)

PARTITION(2)

VALLEN(3)

 

 

 

// write accounting info

 

kvmeta.put(kvindex + PARTITION, partition);

 

kvmeta.put(kvindex + KEYSTART, keystart);

 

kvmeta.put(kvindex + VALSTART, valstart);

 

kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));

 

// advance kvindex

 

kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();

 

} catch (MapBufferTooSmallException e) {

 

LOG.info("Record too large for in-memory buffer: " + e.getMessage());

 

spillSingleRecord(key, value, partition);

 

mapOutputRecordCounter.increment(1);

 

return;

 

}

 

}

 

 

 

写入数据到buffer中的实现:

 

通过MapTask.MapOutputBuffer.Buffer.write方法

 

public void write(byte b[], int off, int len)

 

throws IOException {

 

// must always verify the invariant that at least METASIZE bytes are

 

// available beyond kvindex, even when len == 0

 

bufferRemaining -= len;

 

if (bufferRemaining <= 0) {

 

...............................................

 

}

 

此处检查bufindex(kvbuffer中现在的下标值)+len是否达到了bufvoid(默认是kvbuffer的最后)

 

如果执行过spill操作,buffer写入到下标的最后时,重新开始从0开始写入后,

 

bufvoid的值是上一次写入完成的bufmark的值(最后一次完成写入的下标)

 

也就是说现在写入已经达到buffer的最后位置,但是要写入的数据装不完,

 

如:要写入数据是5byte,但现在kvbuffer最后端只能写入3byte,

 

此时会把于下的2byte写入到kvbuffer的开始位置。这就是环行buffer

 

// here, we know that we have sufficient space to write

 

if (bufindex + len > bufvoid) {

 

finalint gaplen = bufvoid - bufindex;

 

System.arraycopy(b, off, kvbuffer, bufindex, gaplen);

 

len -= gaplen;

 

off += gaplen;

 

bufindex = 0;

 

}

 

System.arraycopy(b, off, kvbuffer, bufindex, len);

 

bufindex += len;

 

}

 

}

 

 

 

关于当bufindex的值小于keystart时,也就是环行部分重新开始写入时,执行的shiftBufferedKey

 

这个部分主要是把buffer中要写入的数据超过了buffer能在最后写入的值时:

 

write后示例值:

 

要写入的byte array [1,2,3,4,5]

 

执行写入后buffer的内容如下:最后只能存储3byte,这里把123写入到最后,

 

同时把45写入到最前面部分

 

4

5

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

1

2

3

 

 

 

执行shiftBufferedKey以后,此时buffer的内容变成如下:

 

1

2

3

4

5

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

protected void shiftBufferedKey() throws IOException {

 

此时的bufmark是上一次完成写入的bufindex的下标值,得到最后写入的几个byte的长度。

 

比如上面提到的,写5个,但最后只有3byte的长度,那么这里得到的就是3.

 

// spillLock unnecessary; both kvend and kvindex are current

 

int headbytelen = bufvoidbufmark;

 

bufvoid也就是可写入的最后下标的值修改成上一次完成写入的最后一个下标值。

 

bufvoid = bufmark;

 

finalint kvbidx = 4 * kvindex;

 

finalint kvbend = 4 * kvend;

 

finalint avail =

 

Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));

 

if (bufindex + headbytelen < avail) {

 

把环行部分,如上面最后从头开始写入的2byte向后移动3byte

 

System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);

 

bufer最后部分的3byte写入到开始部分。

 

System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);

 

bufindex向后移动几个byte,并重新计算可用的空间

 

bufindex += headbytelen;

 

bufferRemaining -= kvbuffer.length - bufvoid;

 

} else {

 

byte[] keytmp = newbyte[bufindex];

 

System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);

 

bufindex = 0;

 

out.write(kvbuffer, bufmark, headbytelen);

 

out.write(keytmp);

 

}

 

}

 

}

 

 

 

数据达到bufferlimit时,执行的spill操作:

 

if (bufferRemaining <= 0) {

 

// start spill if the thread is not running and the soft limit has been

 

// reached

 

spillLock.lock();

 

try {

 

do {

 

如果spillInProgress的值为true时,表示spill操作正在进行。

 

if (!spillInProgress) {

 

finalint kvbidx = 4 * kvindex;

 

第一次执行时kvend的值为0,第二次时是kvindex的上一个值(kvend)*4,

 

kvend表示已经完成的kvmeta的下标值,kvindex表示现在准备使用的下标值

 

finalint kvbend = 4 * kvend;

 

得到已经使用的字节数

 

// serialized, unspilled bytes always lie between kvindex and

 

// bufindex, crossing the equator. Note that any void space

 

// created by a reset must be included in "used" bytes

 

finalint bUsed = distanceTo(kvbidx, bufindex);

 

得到已经使用的字节数是否已经达到spill的配置大小,也就是buffer0.8默认。

 

finalboolean bufsoftlimit = bUsed >= softLimit;

 

这里表示spill完成,回收空间,

 

if ((kvbend + METASIZE) % kvbuffer.length !=

 

equator - (equator % METASIZE)) {

 

// spill finished, reclaim space

 

resetSpill();

 

bufferRemaining = Math.min(

 

distanceTo(bufindex, kvbidx) - 2 * METASIZE,

 

softLimit - bUsed) - METASIZE;

 

continue;

 

} elseif (bufsoftlimit && kvindex != kvend) {

 

发起spill操作

 

// spill records, if any collected; check latter, as it may

 

// be possible for metadata alignment to hit spill pcnt

 

startSpill();

 

finalint avgRec = (int)

 

(mapOutputByteCounter.getCounter() /

 

mapOutputRecordCounter.getCounter());

 

// leave at least half the split buffer for serialization data

 

// ensure that kvindex >= bufindex

 

finalint distkvi = distanceTo(bufindex, kvbidx);

 

finalint newPos = (bufindex +

 

Math.max(2 * METASIZE - 1,

 

Math.min(distkvi / 2,

 

distkvi / (METASIZE + avgRec) * METASIZE)))

 

% kvbuffer.length;

 

setEquator(newPos);

 

bufmark = bufindex = newPos;

 

finalint serBound = 4 * kvend;

 

// bytes remaining before the lock must be held and limits

 

// checked is the minimum of three arcs: the metadata space, the

 

// serialization space, and the soft limit

 

bufferRemaining = Math.min(

 

// metadata max

 

distanceTo(bufend, newPos),

 

Math.min(

 

// serialization max

 

distanceTo(newPos, serBound),

 

// soft limit

 

softLimit)) - 2 * METASIZE;

 

}

 

}

 

} while (false);

 

} finally {

 

spillLock.unlock();

 

}

 

}

 

 

 

发起startSpill操作

 

private void startSpill() {

 

assert !spillInProgress;

 

记录住最后一个完成的kvindex的下标。

 

kvend = (kvindex + NMETA) % kvmeta.capacity();

 

记录住标记住的最后一个完成的kv写入在kvbuffer中的下标

 

bufend = bufmark;

 

设置spill操作正在进行

 

spillInProgress = true;

 

LOG.info("Spilling map output");

 

LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +

 

"; bufvoid = " + bufvoid);

 

LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +

 

"); kvend = " + kvend + "(" + (kvend * 4) +

 

"); length = " + (distanceTo(kvend, kvstart,

 

kvmeta.capacity()) + 1) + "/" + maxRec);

 

此处使用了java新的线程通信的方法,notify线程的等待。

 

spillReady.signal();

 

}

 

 

 

此时,MapTask.MapOutputBuffer.SpillThread线程接收到signal命令:

 

public void run() {

 

spillLock.lock();

 

spillThreadRunning = true;

 

try {

 

while (true) {

 

spillDone.signal();

 

while (!spillInProgress) {

 

如果线程发现spillInProgress的值是false时,等待,

 

buffer中的数据达到sortlimit的值时,通过spillReady.signalnotify此线程。

 

spillReady.await();

 

}

 

try {

 

spillLock.unlock();

 

sortAndSpill();

 

} catch (Throwable t) {

 

sortSpillException = t;

 

} finally {

 

spillLock.lock();

 

if (bufend < bufstart) {

 

bufvoid = kvbuffer.length;

 

}

 

kvstart = kvend;

 

bufstart = bufend;

 

spillInProgress = false;

 

}

 

}

 

} catch (InterruptedException e) {

 

Thread.currentThread().interrupt();

 

} finally {

 

spillLock.unlock();

 

spillThreadRunning = false;

 

}

 

}

 

}

 

 

 

执行排序与spill操作:

 

调用MapTask.MapOutputBuffer.sortAndSpill函数:

 

 

 

private void sortAndSpill() throws IOException, ClassNotFoundException,

 

InterruptedException {

 

//approximate the length of the output file to be the length of the

 

//buffer + header lengths for the partitions

 

finallong size = (bufend >= bufstart

 

? bufend - bufstart

 

: (bufvoid - bufend) + bufstart) +

 

partitions * APPROX_HEADER_LENGTH;

 

FSDataOutputStream out = null;

 

try {

 

生成写入文件,路径通过在mapreduce.cluster.local.dir配置中写入localmr路径

 

在路径下生成(attempid)_spill_(numspills).out或者output/spill(numspills).out文件。

 

// create spill file

 

final SpillRecord spillRec = new SpillRecord(partitions);

 

final Path filename =

 

mapOutputFile.getSpillFileForWrite(numSpills, size);

 

out = rfs.create(filename);

 

 

 

finalint mstart = kvend / NMETA;

 

finalint mend = 1 + // kvend is a valid record

 

(kvstart >= kvend

 

? kvstart

 

: kvmeta.capacity() + kvstart) / NMETA;

 

执行排序操作。把buffer中的数据进行排序。排序的比较器通过MapOutputBuffer.compare,

 

默认是通过key进行排序。

 

sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);

 

int spindex = mstart;

 

final IndexRecord rec = new IndexRecord();

 

final InMemValBytes value = new InMemValBytes();

 

for (int i = 0; i < partitions; ++i) {

 

IFile.Writer<K, V> writer = null;

 

try {

 

long segmentStart = out.getPos();

 

writer = new Writer<K, V>(job, out, keyClass, valClass, codec,

 

spilledRecordsCounter);

 

检查是否有combiner处理程序,如果没有,直接把buffer中排序后的数据写入到spill文件中。

 

注意,写入时,数据是按partition从小到大写入。

 

if (combinerRunner == null) {

 

// spill directly

 

DataInputBuffer key = new DataInputBuffer();

 

while (spindex < mend &&

 

kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {

 

finalint kvoff = offsetFor(spindex % maxRec);

 

int keystart = kvmeta.get(kvoff + KEYSTART);

 

int valstart = kvmeta.get(kvoff + VALSTART);

 

key.reset(kvbuffer, keystart, valstart - keystart);

 

getVBytesForOffset(kvoff, value);

 

writer.append(key, value);

 

++spindex;

 

}

 

} else {

 

此时表示配置有combiner处理程序,通过执行combiner中的reduce程序,把数据进行处理后写入。

 

int spstart = spindex;

 

while (spindex < mend &&

 

kvmeta.get(offsetFor(spindex % maxRec)

 

+ PARTITION) == i) {

 

++spindex;

 

}

 

// Note: we would like to avoid the combiner if we've fewer

 

// than some threshold of records for a partition

 

if (spstart != spindex) {

 

combineCollector.setWriter(writer);

 

RawKeyValueIterator kvIter =

 

new MRResultIterator(spstart, spindex);

 

combinerRunner.combine(kvIter, combineCollector);

 

}

 

}

 

 

 

// close the writer

 

writer.close();

 

此处每写入一个partition的数据后,

 

生成一个针对此partition在文件中的开始位置,写入此partition的长度。

 

并添加到spillindex中。

 

// record offsets

 

rec.startOffset = segmentStart;

 

rec.rawLength = writer.getRawLength();

 

rec.partLength = writer.getCompressedLength();

 

spillRec.putIndex(rec, i);

 

 

 

writer = null;

 

} finally {

 

if (null != writer) writer.close();

 

}

 

}

 

如果splitindex中的cache的数据大于了配置的值,把新生成的spillindex写入index文件。

 

如果spillindex没有达到配置的值时,所有的spillindex文件存储到内存中,

 

如果达到了配置的值以后生成的spillindex文件不进行cache,直接写入到文件中,

 

后期在读取时通过numSpills的值来从文件中读取,

 

示例代码:for (int i = indexCacheList.size(); i < numSpills; ++i)

 

如上代码就是从indexCacheList.size开始,因为此时超过了cachespillindex直接写入到了文件。

 

把于下的spillindex从文件中读取出来。

 

 

 

if (totalIndexCacheMemory >= indexCacheMemoryLimit) {

 

// create spill index file

 

Path indexFilename =

 

mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions

 

* MAP_OUTPUT_INDEX_RECORD_LENGTH);

 

spillRec.writeToFile(indexFilename, job);

 

} else {

 

否则把spillindex添加到index cache中,并把长度累加起来。

 

indexCacheList.add(spillRec);

 

totalIndexCacheMemory +=

 

spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;

 

}

 

LOG.info("Finished spill " + numSpills);

 

++numSpills;

 

} finally {

 

if (out != null) out.close();

 

}

 

}

 

 

 

map输出的IFile - spill文件格式:

 

partition1

partition2

keylen(4)

vallen(4)

key

value

keylen(4)

vallen(4)

key

value

 

 

 

 

 

Mapspill-index文件格式:

 

partition1

partition2

SegmentStart(

partition的开始位置)

rawlen(总长度)

CompressedLength(总长度)

SegmentStart(

partition的开始位置)

rawlen(总长度)

CompressedLength(总长度)

 

 

 

 

 

此时,mapper.run函数完成,执行如下操作:output.close操作。

 

try {

 

input.initialize(split, mapperContext);

 

mapper.run(mapperContext);

 

mapPhase.complete();

 

setPhase(TaskStatus.Phase.SORT);

 

statusUpdate(umbilical);

 

input.close();

 

input = null;

 

output.close(mapperContext);

 

output = null;

 

} finally {

 

closeQuietly(input);

 

closeQuietly(output, mapperContext);

 

}

 

 

 

此处分析output.close主要分析有reduce的情况,如果没有reduce是直接关闭输出文件。

 

MapTask.NewOutputCollector.close

 

调用MapTask.MapOutputBuffer.flush把于下的数据spill到文件,等待SpillThread线程完成。

 

执行mergeParts函数合并小的spill文件。

 

public void close(TaskAttemptContext context

 

) throws IOException,InterruptedException {

 

try {

 

collector.flush();

 

} catch (ClassNotFoundException cnf) {

 

thrownew IOException("can't find class ", cnf);

 

}

 

collector.close();

 

}

 

}

 

 

 

MapOutputBuffer.flush函数操作

 

public void flush() throws IOException, ClassNotFoundException,

 

InterruptedException {

 

先把buffer中的数据执行sortspill操作。

 

LOG.info("Starting flush of map output");

 

spillLock.lock();

 

try {

 

while (spillInProgress) {

 

reporter.progress();

 

spillDone.await();

 

}

 

checkSpillException();

 

 

 

finalint kvbend = 4 * kvend;

 

if ((kvbend + METASIZE) % kvbuffer.length !=

 

equator - (equator % METASIZE)) {

 

// spill finished

 

resetSpill();

 

}

 

if (kvindex != kvend) {

 

kvend = (kvindex + NMETA) % kvmeta.capacity();

 

bufend = bufmark;

 

LOG.info("Spilling map output");

 

LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +

 

"; bufvoid = " + bufvoid);

 

LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +

 

"); kvend = " + kvend + "(" + (kvend * 4) +

 

"); length = " + (distanceTo(kvend, kvstart,

 

kvmeta.capacity()) + 1) + "/" + maxRec);

 

sortAndSpill();

 

}

 

} catch (InterruptedException e) {

 

thrownew IOException("Interrupted while waiting for the writer", e);

 

} finally {

 

spillLock.unlock();

 

}

 

assert !spillLock.isHeldByCurrentThread();

 

// shut down spill thread and wait for it to exit. Since the preceding

 

// ensures that it is finished with its work (and sortAndSpill did not

 

// throw), we elect to use an interrupt instead of setting a flag.

 

// Spilling simultaneously from this thread while the spill thread

 

// finishes its work might be both a useful way to extend this and also

 

// sufficient motivation for the latter approach.

 

Try {

 

等待spillthread操作完成。

 

spillThread.interrupt();

 

spillThread.join();

 

} catch (InterruptedException e) {

 

thrownew IOException("Spill failed", e);

 

}

 

// release sort buffer before the merge

 

kvbuffer = null;

 

合并所有的spill小文件。

 

mergeParts();

 

Path outputPath = mapOutputFile.getOutputFile();

 

fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());

 

}

 

 

 

mergeParts函数:

 

 

 

private void mergeParts() throws IOException, InterruptedException,

 

ClassNotFoundException {

 

// get the approximate size of the final output/index files

 

long finalOutFileSize = 0;

 

long finalIndexFileSize = 0;

 

final Path[] filename = new Path[numSpills];

 

final TaskAttemptID mapId = getTaskID();

 

首先得到所有的spill的数据文件。

 

for(int i = 0; i < numSpills; i++) {

 

filename[i] = mapOutputFile.getSpillFile(i);

 

finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();

 

}

 

如果只有一个spill文件,直接把生成的spill数据文件与索引文件生成为map的输出文件

 

说得坦白点就是把spill文件直接rename到目标mapoutput路径下

 

if (numSpills == 1) { //the spill is the final output

 

sameVolRename(filename[0],

 

mapOutputFile.getOutputFileForWriteInVolume(filename[0]));

 

if (indexCacheList.size() == 0) {

 

sameVolRename(mapOutputFile.getSpillIndexFile(0),

 

mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));

 

} else {

 

indexCacheList.get(0).writeToFile(

 

mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);

 

}

 

sortPhase.complete();

 

return;

 

}

 

如果包含多个spill文件,先读取未被cache到内存部分的索引文件(spillindex)

 

// read in paged indices

 

for (int i = indexCacheList.size(); i < numSpills; ++i) {

 

Path indexFileName = mapOutputFile.getSpillIndexFile(i);

 

indexCacheList.add(new SpillRecord(indexFileName, job));

 

}

 

 

 

//make correction in the length to include the sequence file header

 

//lengths for each partition

 

finalOutFileSize += partitions * APPROX_HEADER_LENGTH;

 

finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;

 

生成对应的output文件与output index文件。Index中记录有partition的开始位置与长度

 

job中的attempid目录下生成一个file.out文件是数据文件的输出

 

job中的attempid目录下生成一个file.out.index文件是数据索引文件

 

Path finalOutputFile =

 

mapOutputFile.getOutputFileForWrite(finalOutFileSize);

 

Path finalIndexFile =

 

mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);

 

 

 

//The output stream for the final single output file

 

FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

 

如果numSpills个数为0表示没有生成输出文件,此时生成一个空的数据文件,并生成一个索引文件,

 

此索引文件中每一个partition的索引都为0

 

if (numSpills == 0) {

 

//create dummy files

 

IndexRecord rec = new IndexRecord();

 

SpillRecord sr = new SpillRecord(partitions);

 

try {

 

for (int i = 0; i < partitions; i++) {

 

long segmentStart = finalOut.getPos();

 

Writer<K, V> writer =

 

new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);

 

writer.close();

 

rec.startOffset = segmentStart;

 

rec.rawLength = writer.getRawLength();

 

rec.partLength = writer.getCompressedLength();

 

sr.putIndex(rec, i);

 

}

 

sr.writeToFile(finalIndexFile, job);

 

} finally {

 

finalOut.close();

 

}

 

sortPhase.complete();

 

return;

 

}

 

{

 

sortPhase.addPhases(partitions); // Divide sort phase into sub-phases

 

 

 

IndexRecord rec = new IndexRecord();

 

final SpillRecord spillRec = new SpillRecord(partitions);

 

此时,从最小的partition开始合并所有的小的spill文件

 

for (int parts = 0; parts < partitions; parts++) {

 

//create the segments to be merged

 

List<Segment<K,V>> segmentList =

 

new ArrayList<Segment<K, V>>(numSpills);

 

此处开始迭代所有的spill数据文件,得到spill文件中对应的partitionsegment,

 

添加到一个集合容器中(此时通过每一个spill文件对应的index可以拿到segment在文件中的位置)

 

for(int i = 0; i < numSpills; i++) {

 

IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

 

 

 

Segment<K,V> s =

 

new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,

 

indexRecord.partLength, codec, true);

 

segmentList.add(i, s);

 

 

 

if (LOG.isDebugEnabled()) {

 

LOG.debug("MapId=" + mapId + " Reducer=" + parts +

 

"Spill =" + i + "(" + indexRecord.startOffset + "," +

 

indexRecord.rawLength + ", " + indexRecord.partLength + ")");

 

}

 

}

 

读取merge因子,通过mapreduce.task.io.sort.factor配置。默认为100

 

int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);

 

// sort the segments only if there are intermediate merges

 

boolean sortSegments = segmentList.size() > mergeFactor;

 

//merge

 

@SuppressWarnings("unchecked")

 

生成一个Merger.MergeQueue队列,根据所有此partition中的segments,

 

如果当前的spill文件个数超过了配置的merge因子的个数,把segment按文件大小从小到大排序。

 

RawKeyValueIterator kvIter = Merger.merge(job, rfs,

 

keyClass, valClass, codec,

 

segmentList, mergeFactor,

 

new Path(mapId.toString()),

 

job.getOutputKeyComparator(), reporter, sortSegments,

 

null, spilledRecordsCounter, sortPhase.phase(),

 

TaskType.MAP);

 

生成Writer实例,

 

//write merged output to disk

 

long segmentStart = finalOut.getPos();

 

Writer<K, V> writer =

 

new Writer<K, V>(job, finalOut, keyClass, valClass, codec,

 

spilledRecordsCounter);

 

如果combiner没有配置,

 

或者spill文件的个数还不达到mapreduce.map.combine.minspills配置的个数,默认为3

 

不执行combiner操作。直接写入文件。

 

if (combinerRunner == null || numSpills < minSpillsForCombine) {

 

Merger.writeFile(kvIter, writer, reporter, job);

 

} else {

 

否则执行combiner操作并写入文件。combiner其实可以理解为没有shufflereduce

 

combineCollector.setWriter(writer);

 

combinerRunner.combine(kvIter, combineCollector);

 

}

 

提示:Merger.MergeQueue队列中每next去读取一条记录,

 

就会从所有的segment中读取出最小的一个kv,并写入此kv的值,

 

去执行next操作把所有的segment都放到一个优先级堆中,通过优先堆排序取出最小的一个kv.

 

//close

 

writer.close();

 

 

 

sortPhase.startNextPhase();

 

记录当前partition的索引信息。

 

// record offsets

 

rec.startOffset = segmentStart;

 

rec.rawLength = writer.getRawLength();

 

rec.partLength = writer.getCompressedLength();

 

spillRec.putIndex(rec, parts);

 

}

 

所有partition合并完成后,写入索引文件。并删除spill的小数据文件。

 

spillRec.writeToFile(finalIndexFile, job);

 

finalOut.close();

 

for(int i = 0; i < numSpills; i++) {

 

rfs.delete(filename[i],true);

 

}

 

}

 

}

 

 

 

 

 

结束语:每个spill文件写入时会执行快速排序(内存中)combiner操作,

 

最后多个spill合并时使用外部排序(磁盘)来对文件进行比较并取出最小的kv,写入文件,

 

此时如果spill文件的个数超过配置的值时,会再做一次combiner操作。

 

 

 

 

 

0
0
分享到:
评论

相关推荐

    Optimizing Hadoop for MapReduce(PACKT,2014)

    MapReduce is the distribution system that the Hadoop MapReduce engine uses to distribute work around a cluster by working parallel on smaller data sets. It is useful in a wide range of applications, ...

    kafka-hadoop-loader-my:kafka0.8.2使用简单的消费者负载消息使用自定义mapreduce进入hdfs

    实际使用者及其内部提取程序线程都包装为KafkaInputContext,它是为每个Map Task的记录读取器对象创建的。 然后,映射器接收最不利的消息对,解析日期的内容并发出(date,message),然后由Output Format拾取并在...

    Hadoop从入门到上手企业开发

    060 MapReduce执行流程之Shuffle和排序流程以及Map端分析 061 MapReduce执行流程之Reduce端分析 062 MapReduce Shuffle过程讲解和Map Shuffle Phase讲解 063 Reduce Shuffle Phase讲解 064 源代码跟踪查看Map Task和...

    hadoop 1.2.1 api 最新chm 伪中文版

    Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 一个Map/Reduce 作业(job) 通常会把输入的...

    Hadoop中MapReduce基本案例及代码(三)

    分区操作是shuffle操作中的一个重要过程,作用就是将map的结果按照规则分发到不同reduce中进行处理,从而按照分区得到多个输出结果。 Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类...

    MapReduce详解包括配置文件

    MapReduce是Hadoop提供的一套用于进行分布式计算的模型,本身是Doug Cutting根据Google的&lt;MapReduce: Simplified Data Processing on Large Clusters&gt;仿照实现的。 MapReduce由两个阶段组成:Map(映射)阶段和Reduce...

    22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件

    本文的前提是hadoop环境正常。 本文最好和MapReduce操作常见的文件文章一起阅读,因为写文件与压缩往往是结合在一起的。 相关压缩算法介绍参考文章:HDFS文件类型与压缩算法介绍。 本文介绍写文件时使用的压缩算法,...

    大数据学习(九):mapreduce编程模型及具体框架实现

     hadoop中的mapreduce框架、spark。  hadoop中的mapreduce框架:  对编程模型阶段1实现就是:map task  对编程模型阶段2的实现就是reduce task。 map task:  读数据:InputFormat–&gt;TextInputFormat

    hadoop 权威指南(第三版)英文版

    hadoop权威指南第三版(英文版)。 Foreword . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiii Preface . . . . . . ....

    最新Hadoop的面试题总结

    (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。 (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台...

    java大数据作业_1云计算、大数据、hadoop

    课后作业 1.SAAS、PAAS、IAAS、XAAS都是什么意思?另外猜猜DAAS是什么意思? 2.大数据的4个特点是什么? 3.虚拟机与主机构成闭环局域网...Map Task Capacity Reduce Task Capacity Queue Name 10.如何启动一个datanode

    hadoop_the_definitive_guide_3nd_edition

    Hadoop definitive 第三版, 目录如下 1. Meet Hadoop . . . 1 Data! 1 Data Storage and Analysis 3 Comparison with Other Systems 4 RDBMS 4 Grid Computing 6 Volunteer Computing 8 A Brief History of Hadoop 9...

    forcombiner_reduce_java_mapReduce_markizj_yourselfarq_源码

    Map Reduce中的Combiner就是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少...

    2018最新BAT大数据面试题.docx

    大数据这么火,但是从业人员也多啊,需要好好琢磨一下,充分准备。... 4)由于这些排序是 MapReduce 自动完成的,用户无法控制,因此,在hadoop 1.x 中无法避免,也不可以关闭,但 hadoop2.x 是可以关闭的。

    MapReduceV1:JobTracker端Job/Task数据结构

    在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。在编写MapReduce程序时,我们是以Job为单位进行编程处理,一...

    拥抱大数据——初识Hadoop,轻松应对海量数据存储与分析所带来的挑战

    4.1 Map+Reduce 4.2 MapReduce架构 4.3 MapReduce数据处理 4.3.1 job与task 4.3.2 MapReduce数据处理 五、YARN(资源管理系统) 5.1 YARN架构 六、手把手搭建Hadoop环境(Linux上) 6.1 安装jdk 6.2 安装hadoop 6.3

    HIVE查询优化

    1、设置合理的task数量(map task、reduce task) 这里有几个考虑的点,一方面Hadoop MR task的启动及初始化时间较长,如果task过多,可能会导致任务启动和初始化时间远超逻辑处理时间,这种情况白白浪费了计算资源...

    MapReduce实例浅析

    HadoopMap/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。一个Map/Reduce作业(job)通常会把输入的数据集...

Global site tag (gtag.js) - Google Analytics