2016-05-04 15 views
9

में आरडीडी कुल मिलाकर मैं एक अपाचे स्पार्क शिक्षार्थी हूं और RDD एक्शन aggregate पर आया हूं, जिसमें मुझे कोई फर्क नहीं पड़ता कि यह कैसे काम करता है। कुछ एक उल्लेख और कदम से विस्तार कदम में बताएं कि किस तरह की थी कि हम नीचे परिणाम पर कोड के लिए यहाँस्पार्क

RDD input = {1,2,3,3} 

RDD Aggregate function : 

rdd.aggregate((0, 0)) 
((x, y) => 
(x._1 + y, x._2 + 1), 
(x, y) => 
(x._1 + y._1, x._2 + y._2)) 

output : {9,4} 

धन्यवाद पहुंच सकते हैं

उत्तर

18

आप सुनिश्चित नहीं हैं कि क्या यह चल रहा है सबसे अच्छा पालन करने के लिए है प्रकार छोड़ना संक्षिप्तता के लिए अंतर्निहित ClassTag हम इस

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

की तरह कुछ के साथ शुरू आप सभी अतिरिक्त पैरामीटर आप देखेंगे कि aggregate एक समारोह जो RDD[T] से U का मानचित्रण है ध्यान नहीं देते हैं। इसका मतलब है कि इनपुट RDD में मानों का प्रकार आउटपुट मान के प्रकार के समान नहीं होना चाहिए। तो यह उदाहरण reduce के लिए की तुलना में स्पष्ट रूप से अलग है:

def reduce(func: (T, T) ⇒ T): T 

या fold:

def fold(zeroValue: T)(op: (T, T) => T): T 

ही fold के रूप में, aggregate एक zeroValue की आवश्यकता है। इसे कैसे चुनें? यह combOp के संबंध में एक पहचान (तटस्थ) तत्व होना चाहिए।

तुम भी दो कार्यों प्रदान करने के लिए है:

  • seqOp जो (U, T) से U
  • combOp जो (U, U) से U

बस आधारित इस हस्ताक्षर पर आप पहले से ही देखना चाहिए करने के लिए नक्शे के लिए नक्शे कि केवल seqOp कच्चे डेटा तक पहुंच सकता है। इसमें U प्रकार का कुछ मान T टाइप करता है और U प्रकार का मान देता है। आपके मामले में यह एक निम्न हस्ताक्षर

((Int, Int), Int) => (Int, Int) 

इस बिंदु पर आप शायद संदेह है कि यह गुना की तरह आपरेशन के कुछ प्रकार के लिए प्रयोग किया जाता है के साथ एक समारोह है।

दूसरा फ़ंक्शन U प्रकार के दो तर्क लेता है और U प्रकार का मान देता है। जैसा कि पहले बताया गया है कि यह स्पष्ट होना चाहिए कि यह मूल डेटा को स्पर्श नहीं करता है और केवल seqOp द्वारा संसाधित मूल्यों पर ही कार्य कर सकता है। आपके मामले में इस फ़ंक्शन में निम्न हस्ताक्षर हैं:

((Int, Int), (Int, Int)) => (Int, Int) 

तो हम सभी को एक साथ कैसे प्राप्त कर सकते हैं?

  1. पहले प्रत्येक विभाजन zeroValue, seqOp और combOp रूप z, seqop और combop respectivelly पारित कर दिया साथ मानक Iterator.aggregate का उपयोग कर एकत्रित किया गया है।चूंकि InterruptibleIterator आंतरिक रूप से उपयोग aggregate पर हावी नहीं होता यह के रूप में एक सरल foldLeft(zeroValue)(seqOp)

  2. अगला आंशिक प्रत्येक विभाजन से एकत्र परिणामों का उपयोग combOp

चलें कि इनपुट मान RDD निम्नलिखित के साथ तीन विभाजन है एकत्रित कर रहे हैं निष्पादित किया जाना चाहिए मानों का वितरण:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

आपको लगता है कि निष्पादन उम्मीद कर सकते हैं, पूर्ण आदेश की अनदेखी कर, कुछ इस तरह के बराबर होगी: एक ही विभाजन के लिए

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1) 
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2) 

Seq(Iterator(1, 2), Iterator(3, 3), Iterator()) 
    .map(_.foldLeft((0, 0))(seqOp)) 
    .reduce(combOp) 

foldLeft इस तरह दिख सकता:

Iterator(1, 2).foldLeft((0, 0))(seqOp) 
Iterator(2).foldLeft((1, 1))(seqOp) 
(3, 2) 

और सभी विभाजनों पर

Seq((3,2), (6,2), (0,0)) 

जो संयुक्त आप दे देंगे परिणाम मनाया:

(3 + 6 + 0, 2 + 2 + 0) 
(9, 4) 

सामान्य तौर पर यह एक आम पैटर्न आप सभी स्पार्क जहां तटस्थ मान पास अधिक मिलेगा, एक समारोह विभाजन और एक समारोह प्रति मूल्यों पर कार्रवाई करने के लिए प्रयोग किया जाता विभिन्न विभाजनों से आंशिक योग को मर्ज करने के लिए प्रयोग किया जाता है। कुछ अन्य उदाहरण में शामिल हैं:

  • aggregateByKey
  • उपयोगकर्ता निर्धारित सकल कार्य
  • Aggregators स्पार्क Datasets पर।
1

यहाँ आपके संदर्भ के लिए मेरी समझ है:

कल्पना कीजिए कि आप दो नोड्स है, एक पहले दो सूची तत्वों {1,2} के इनपुट लेते हैं, और एक और ले जाता है {3, 3}। (विभाजन यहाँ केवल सुविधाजनक के लिए है)

प्रथम नोड में: "(एक्स, वाई) => (x._1 + y, x._2 +1)" पहले एक्स है (0 , 0) दिए गए अनुसार, और y आपका पहला तत्व 1 है, और आपके पास आउटपुट (0 + 1, 0 + 1) होगा, फिर आपका दूसरा तत्व वाई = 2, और आउटपुट (1 + 2, 1 + 1) आता है, जो है (3, 2)

दूसरे नोड पर, समान प्रक्रिया समानांतर में होती है, और आपके पास (6, 2) होगा।

"(x, y) => (x._1 + y._1, x._2 + y._2)", आपको दो नोड्स मर्ज करने के लिए कहता है, और आपको (9,4)


एक बात लायक देख रही (0,0) वास्तव में परिणाम लंबाई (आरडीडी) में जोड़ा जाता है +1 बार है।

"स्कैला> rdd.aggregate ((1,1)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x। _1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9) "