2016-07-14 9 views
15

में अपने स्वयं के reduceByKey रोलिंग मैं RDDs के अलावा अधिक DataFrames और डेटासेट उपयोग करने के लिए सीखने के लिए कोशिश कर रहा हूँ। आरडीडी के लिए, मुझे पता है कि मैं someRDD.reduceByKey((x,y) => x + y) कर सकता हूं, लेकिन मुझे डेटासेट के लिए यह फ़ंक्शन दिखाई नहीं देता है। तो मैंने एक लिखने का फैसला किया।स्पार्क डेटासेट

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => { 
    val result = mutable.HashMap.empty[(Long,Long),Int] 
    val keys = mutable.HashSet.empty[(Long,Long)] 
    y.keys.foreach(z => keys += z) 
    x.keys.foreach(z => keys += z) 
    for (elem <- keys) { 
    val s1 = if(x.contains(elem)) x(elem) else 0 
    val s2 = if(y.contains(elem)) y(elem) else 0 
    result(elem) = s1 + s2 
    } 
    result 
}) 

हालांकि, यह सबकुछ ड्राइवर को देता है। Dataset वापस करने के लिए आप इसे कैसे लिखेंगे? शायद नक्शापार्टिशन और क्या वहां है?

नोट इस संकलित लेकिन नहीं चलता है क्योंकि यह अभी तक

+0

स्पार्क 2.0.0 के साथ, इसे आजमाएं, yourDataset.groupByKey (...)। समूह समूह (...) –

+6

क्या उत्प्रेरक अनुकूलक नोटिस होगा कि आप एक समूह कर रहे हैं और इसे और अधिक कुशल बनाते हैं? 'कुशल' से मेरा मतलब है कि एक आरडीडी कुंजी द्वारा कम करने के तरीके के माध्यम से समूह को करने से बेहतर है? –

उत्तर

18

Map के लिए एनकोडर नहीं है मुझे लगता है अपने लक्ष्य डेटासेट को यह मुहावरा अनुवाद करने के लिए है:

rdd.map(x => (x.someKey, x.someField)) 
    .reduceByKey(_ + _) 

// => returning an RDD of (KeyType, FieldType) 

वर्तमान में, सबसे करीब समाधान मैं डेटासेट एपीआई के साथ मिल गया है इस तरह दिखता है:

ds.map(x => (x.someKey, x.someField))   // [1] 
    .groupByKey(_._1)        
    .reduceGroups((a, b) => (a._1, a._2 + b._2)) 
    .map(_._2)         // [2] 

// => returning a Dataset of (KeyType, FieldType) 

// Comments: 
// [1] As far as I can see, having a map before groupByKey is required 
//  to end up with the proper type in reduceGroups. After all, we do 
//  not want to reduce over the original type, but the FieldType. 
// [2] required since reduceGroups converts back to Dataset[(K, V)] 
//  not knowing that our V's are already key-value pairs. 

बहुत ही सुंदर और नहीं लग रही है एक त्वरित बेंचमार्क के अनुसार यह भी बहुत एल है ईएसएस performant, तो शायद हम यहाँ कुछ याद कर रहे हैं ...

नोट: एक वैकल्पिक पहले कदम के रूप groupByKey(_.someKey) उपयोग करने के लिए हो सकता है। समस्या यह है कि groupByKey का उपयोग नियमित Dataset से KeyValueGroupedDataset में बदलता है। उत्तरार्द्ध में नियमित map फ़ंक्शन नहीं है। इसके बजाए यह mapGroups प्रदान करता है, जो बहुत सुविधाजनक प्रतीत नहीं होता है क्योंकि यह मानों को Iterator में लपेटता है और डॉकस्ट्रिंग के अनुसार एक शफल करता है।

+3

यह चाल है। हालांकि एक नोट हालांकि, कम करें ByKey अधिक कुशल है क्योंकि यह शफल होने से पहले प्रत्येक नोड पर कम कर देता है। ग्रुपबीकी करना पहले सभी तत्वों को shuffles तो कम करना शुरू होता है। यही कारण है कि यह बहुत कम प्रदर्शन करता है। क्या अजीब यह इससे पहले कि मैं के बारे में पता था, लेकिन मैं reduceByKey :-) –

+0

@CarlosBribiescas मैं interwebs कि डेटासेट स्पार्क्स 'उत्प्रेरक अनुकूलक का लाभ लेने पर पढ़ा है भूल गया था कि मैं क्या करने के लिए प्रयोग किया जाता है, और नीचे पुश करने के लिए सक्षम होना चाहिए वह यह है कि शफल होने से पहले समारोह को कम करें। यह समझा सकता है कि 'डेटासेट' एपीआई में कोई 'कमीबीकी' क्यों नहीं है। हालांकि, मेरे अनुभव में यह मामला नहीं है और 'groupByKey.reduceGroups' में काफी अधिक डेटा shuffles और' कम से कम 'की तुलना में काफी धीमी है। –

+4

लगता है कि समूह समूह प्रदर्शन 2.0.1 और 2.1.0 [स्पार्क -16391] से तय किया गया है (https://issues.apache.org/jira/browse/SPARK-16391) – Franzi

3

एक अधिक कुशल समाधान groupByKey से पहले mapPartitions का उपयोग करता फेरबदल की मात्रा को कम (ध्यान दें कि यह reduceByKey के रूप में ठीक उसी हस्ताक्षर नहीं है, लेकिन मुझे लगता है कि यह एक समारोह से की आवश्यकता होती है डाटासेट एक टपल से मिलकर बनता है पारित करने के लिए और अधिक लचीला है) करने के लिए।

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V) 
    (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = { 
    def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = { 
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator 
    } 
    ds.mapPartitions(h(f, g, _)) 
    .groupByKey(f)(encK) 
    .reduceGroups(g) 
} 

अपने डेटा के आकार/आकार पर निर्भर करता है, इस बारे में 2x के रूप में तेजी से एक के रूप में groupByKey(_._1).reduceGroupsreduceByKey के प्रदर्शन के 1 सेकंड के भीतर है, और। सुधार के लिए अभी भी कमरा है, इसलिए सुझावों का स्वागत किया जाएगा।

संबंधित मुद्दे