Hadoop備忘:Reduce階段Iterablevalues中的每個(gè)值都共享一個(gè)對(duì)象
/**?
*?Iterate?through?the?values?for?the?current?key,?reusing?the?same?value??
*?object,?which?is?stored?in?the?context.?
*?@return?the?series?of?values?associated?with?the?current?key.?All?of?the??
*?objects?returned?directly?and?indirectly?from?this?method?are?reused.?
*/??
public???
Iterable?getValues()?throws?IOException,?InterruptedException?{??
return?iterable;??
} ?
在Reduce階段,具有相同key的的所有的value都會(huì)被組織到一起,形成一種key:values的形式。
一般情況下,我們會(huì)針對(duì)某個(gè)key下的所有的values進(jìn)行處理,這里需要注意一個(gè)問(wèn)題,當(dāng)我們寫(xiě)下如下代碼的時(shí)候:
protected?void?reduce(KEYIN?key,?Iterable?values,?Context?context??
)?throws?IOException,?InterruptedException?{??
for(VALUEIN?value:?values)?{??
context.write((KEYOUT)?key,?(VALUEOUT)?value);??
}??
} ?
我們?cè)谝粋€(gè)循環(huán)中,每次得到的value實(shí)際上都是指向的同一個(gè)對(duì)象,只是在每次迭代的時(shí)候,將新的值反序列化到這個(gè)對(duì)象中,以更新此對(duì)象的值:
/**?
*?Advance?to?the?next?key/value?pair.?
*/??
@Override??
public?boolean?nextKeyValue()?throws?IOException,?InterruptedException?{??
if?(!hasMore)?{??
key?=?null;??
value?=?null;??
return?false;??
}??
firstValue?=?!nextKeyIsSame;??
DataInputBuffer?nextKey?=?input.getKey();??
currentRawKey.set(nextKey.getData(),?nextKey.getPosition(),???
nextKey.getLength()?-?nextKey.getPosition());??
buffer.reset(currentRawKey.getBytes(),?0,?currentRawKey.getLength());??
key?=?keyDeserializer.deserialize(key);??
DataInputBuffer?nextVal?=?input.getValue();??
buffer.reset(nextVal.getData(),?nextVal.getPosition(),??
nextVal.getLength()?-?nextVal.getPosition());??
value?=?valueDeserializer.deserialize(value); ?
currentKeyLength?=?nextKey.getLength()?-?nextKey.getPosition();??
currentValueLength?=?nextVal.getLength()?-?nextVal.getPosition(); ?
hasMore?=?input.next();??
if?(hasMore)?{??
nextKey?=?input.getKey();??
nextKeyIsSame?=?comparator.compare(currentRawKey.getBytes(),?0,???
currentRawKey.getLength(),??
nextKey.getData(),??
nextKey.getPosition(),??
nextKey.getLength()?-?nextKey.getPosition()??
)?==?0;??
}?else?{??
nextKeyIsSame?=?false;??
}??
inputValueCounter.increment(1);??
return?true;??
} ?
為什么要注意這種情況呢?正如本文開(kāi)始引用的那段Hadoop源代碼中的注釋所指明的,這里主要涉及到引用。如果值的類(lèi)型是自己實(shí)現(xiàn)的某種Writable,比如說(shuō)AType,并且在其中持有對(duì)其它對(duì)象的引用,同時(shí),在Reduce階段還要對(duì)相鄰的兩個(gè)值(current_value,value)進(jìn)行同時(shí)進(jìn)行操作,這個(gè)時(shí)候,如果你僅僅是將value強(qiáng)制類(lèi)型轉(zhuǎn)換成相應(yīng)的AType,這個(gè)時(shí)候,current_value和value其實(shí)是指向的同一段內(nèi)存空間,也就是說(shuō),當(dāng)我們迭代完第一次的時(shí)候,current_value緩存了當(dāng)前的值,但是當(dāng)進(jìn)行下一次迭代,取到的value,其實(shí)就是將它們共同指向的那段內(nèi)存做了更新,換句話說(shuō),current_value所指向的內(nèi)存也會(huì)被更新成value的值,如果不了解Reduce階段values的迭代實(shí)現(xiàn),很可能就會(huì)造成莫名其妙的程序行為,而解決方案就是創(chuàng)建一個(gè)全新的對(duì)象來(lái)緩存“上一個(gè)”值,從而避免這種情況。
評(píng)論