2013-07-06 3 views
6

तो मेरी इनपुट डेटा दो क्षेत्रों/स्तंभ हैं: ID1 & आईडी 2, और मेरे कोड निम्नलिखित है:स्केलिंग: समूह के बाद, अन्य फ़ील्ड को कैसे बनाए रखें ('फ़ील्ड) {। Size}?

TextLine(args("input")) 
.read 
.mapTo('line->('id1,'id2)) {line: String => 
    val fields = line.split("\t") 
     (fields(0),fields(1)) 
} 
.groupBy('id2){.size} 
.write(Tsv(args("output"))) 

में उत्पादन परिणाम (मैं क्या मान) दो क्षेत्रों: आईडी 2 * आकार। मैं यह पता लगाने के लिए थोड़ा फंस गया हूं कि आईडी 1 मान को बनाए रखना संभव है जिसे आईडी 2 के साथ समूहीकृत किया गया था और इसे किसी अन्य क्षेत्र के रूप में जोड़ा गया था?

उत्तर

8

आप इसे डरते हुए एक अच्छा तरीके से नहीं कर सकते हैं। इस बारे में सोचें कि यह हुड के नीचे कैसे काम करता है - यह डेटा को हिस्सों में गिने जाने के लिए विभाजित करता है और इसे विभिन्न प्रक्रियाओं में भेजता है, प्रत्येक प्रक्रिया इसके हिस्से को गिना जाता है, फिर एक भी रेड्यूसर अंत में उन्हें जोड़ता है। जबकि प्रत्येक प्रक्रिया गिनती है, यह पूरे आकार को नहीं जानता है, इसलिए यह फ़ील्ड को जोड़ नहीं सकता है। एकमात्र तरीका वापस जाना है और पूरे आकार के जाने के बाद इसे डेटा में जोड़ना है (यानी एक जॉइन)।

प्रत्येक समूह स्मृति में फिट बैठता है (और आप स्मृति कॉन्फ़िगर कर सकते हैं), तो आप कर सकते हैं:

Tsv(args("input"), ('id1, 'id2)) 
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list)) 
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) { 
    case (list, size) => list.map(record => (record._1, record._2, size)) 
} 
.write(Tsv(args("output"))) 

लेकिन अगर आपके सिस्टम के लिए पर्याप्त स्मृति नहीं है, आप एक महंगी का उपयोग करना होगा शामिल हो।

टिप्पणी: आप टेक्स्टलाइन के बजाय एसएसवी का उपयोग नक्शे के बाद और विभाजन के बाद कर सकते हैं।

+0

कृपया देखें कि क्या यह समझ में आता है, मुझे एक ही दर्द महसूस होता है। http://stackoverflow.com/questions/25994879/scalding-flatten-fields-after-groupby – Sergey

संबंधित मुद्दे