2016-08-05 7 views
6

मुझे RDD में reduce जैसे प्रदर्शन की आवश्यकता है, लेकिन ऑपरेटर को कम्यूटिव होने की आवश्यकता नहीं है। यानी मैं चाहता हूं कि result अनुवर्ती में हमेशा "123456789" होगा।क्या आरडीडी में कोई कार्रवाई ऑर्डर रखती है?

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24 

scala> val result = rdd.someAction{ _+_ } 

सबसे पहले, मुझे fold मिला। RDD#fold के दस्तावेज़ का कहना है:

डीईएफ़ गुना (zeroValue: टी) (सेशन: (टी, टी) ⇒ टी): टी सकल सभी विभाजनों के लिए के तत्वों प्रत्येक विभाजन, और उसके बाद परिणाम, का उपयोग करते हुए एक दिया साहचर्य समारोह और एक तटस्थ "शून्य मान"

नोट है कि वहाँ कोई विनिमेय दस्तावेज़ में की जरूरत है। हालांकि, परिणाम की उम्मीद नहीं है के रूप में:

scala> rdd.fold(""){ _+_ } 
res10: String = 312456879 

संपादित के रूप में @ dk14 ने उल्लेख मैं कोशिश की है, कोई भाग्य के साथ:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res22: String = 341276895 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res23: String = 914856273 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res24: String = 742539618 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res25: String = 271468359 
+0

आप दस्तावेज़ों के अगले खंड को याद करते हैं, जो आप देख रहे हैं: * "यह स्कैला जैसी कार्यात्मक भाषाओं में गैर-वितरित संग्रह के लिए लागू फ़ोल्ड ऑपरेशंस से कुछ अलग तरीके से व्यवहार करता है। यह फोल्ड ऑपरेशन लागू हो सकता है विभाजन अलग-अलग होते हैं, और फिर उन परिणामों को अंतिम परिणाम में फोल्ड करने के बजाय, प्रत्येक परिभाषित क्रम में अनुक्रमिक रूप से प्रत्येक तत्व को फोल्ड करने के बजाय फोल्ड करें। उन कार्यों के लिए जो कम्यूटिव नहीं हैं, परिणाम गैर-वितरित संग्रह पर लागू एक गुना से अलग हो सकता है "* –

उत्तर

2

कोई अंतर्निहित कार्रवाई कि स्काला में इस मानदंड को पूरा करता को कम करने, लेकिन आप आसानी mapPartitions, collect और स्थानीय कटौती के संयोजन के द्वारा अपने स्वयं के लागू कर सकते हैं :

import scala.reflect.ClassTag 

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = { 
    rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f) 
} 

बजाय मर्ज fold द्वारा प्रयोग किया जाता अतुल्यकालिक और अव्यवस्थित विधि के लिए collect और reduce के संयोजन का उपयोग करना सुनिश्चित करता है कि वैश्विक आदेश संरक्षित है।

निश्चित रूप से

यह सहित कुछ अतिरिक्त लागत के साथ आता है: ड्राइवर पर

  • थोड़ा अधिक स्मृति पदचिह्न।
  • काफी अधिक विलंबता - हम स्थानीय कमी शुरू करने से पहले सभी कार्यों को समाप्त करने के लिए स्पष्ट रूप से प्रतीक्षा करते हैं।
+0

आपकी मदद के लिए धन्यवाद, क्या इसका मतलब यह है कि प्रत्येक विभाजन ** संपूर्ण आरडीडी का हमेशा एक निरंतर उप अनुक्रम ** है? क्या कोई दस्तावेज उसमें उल्लेख किया गया है? – Eastsun

+0

दस्तावेज़ों के बारे में - कोई भी जिसे मैं जानता हूं। हालांकि कुछ आदेशित विधियों के मॉडल और अनुबंधों से यह कम या ज्यादा बाधित है। स्पार्क में वास्तविक समस्या यह है कि समग्र अनुक्रम कैसे निर्धारित करें। आम तौर पर जब आप ऑर्डर के बारे में तर्क देते हैं तो दो मामले होते हैं) जब आप स्पष्ट क्रम (अनुबंध द्वारा) का उपयोग करते हैं b) जब आपके पास इनपुट होता है जो निर्धारित आदेशित विभाजन उत्पन्न करता है और इनपुट और वर्तमान बिंदु के बीच कोई शफल और अन्य डेटा आंदोलन नहीं होता है। – zero323

1

रूप @YuvalItzchakov fold द्वारा बताया आदेश की रक्षा नहीं करता है परिणामों को मिलाते समय विभाजित RDD में। इस मूल RDD एक के लिए केवल विभाजन वालों पर विचार समझाने के लिए,

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1) 
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res4: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res5: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res6: String = 123456789 
+0

यह ध्यान दिया जाना चाहिए कि ऐसा करने से पूरी तरह से गणना की समांतरता क्षमताओं को खोने का नुकसान होगा। –

+0

@YuvalItzchakov definite; 'fold' के साथ, ऑर्डरिंग को विभाजित 'RDD' में संरक्षित नहीं किया जा सकता है। – elm

+0

हाँ, मैं समझता हूं। लेकिन ओपी को इसके बारे में पता होना चाहिए। –

संबंधित मुद्दे