आप सुनिश्चित नहीं हैं कि क्या यह चल रहा है सबसे अच्छा पालन करने के लिए है प्रकार छोड़ना संक्षिप्तता के लिए अंतर्निहित ClassTag
हम इस
abstract class RDD[T] extends Serializable with Logging
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
की तरह कुछ के साथ शुरू आप सभी अतिरिक्त पैरामीटर आप देखेंगे कि aggregate
एक समारोह जो RDD[T]
से U
का मानचित्रण है ध्यान नहीं देते हैं। इसका मतलब है कि इनपुट RDD
में मानों का प्रकार आउटपुट मान के प्रकार के समान नहीं होना चाहिए। तो यह उदाहरण reduce
के लिए की तुलना में स्पष्ट रूप से अलग है:
def reduce(func: (T, T) ⇒ T): T
या fold
:
def fold(zeroValue: T)(op: (T, T) => T): T
ही fold
के रूप में, aggregate
एक zeroValue
की आवश्यकता है। इसे कैसे चुनें? यह combOp
के संबंध में एक पहचान (तटस्थ) तत्व होना चाहिए।
तुम भी दो कार्यों प्रदान करने के लिए है:
seqOp
जो (U, T)
से U
combOp
जो (U, U)
से U
बस आधारित इस हस्ताक्षर पर आप पहले से ही देखना चाहिए करने के लिए नक्शे के लिए नक्शे कि केवल seqOp
कच्चे डेटा तक पहुंच सकता है। इसमें U
प्रकार का कुछ मान T
टाइप करता है और U
प्रकार का मान देता है। आपके मामले में यह एक निम्न हस्ताक्षर
((Int, Int), Int) => (Int, Int)
इस बिंदु पर आप शायद संदेह है कि यह गुना की तरह आपरेशन के कुछ प्रकार के लिए प्रयोग किया जाता है के साथ एक समारोह है।
दूसरा फ़ंक्शन U
प्रकार के दो तर्क लेता है और U
प्रकार का मान देता है। जैसा कि पहले बताया गया है कि यह स्पष्ट होना चाहिए कि यह मूल डेटा को स्पर्श नहीं करता है और केवल seqOp
द्वारा संसाधित मूल्यों पर ही कार्य कर सकता है। आपके मामले में इस फ़ंक्शन में निम्न हस्ताक्षर हैं:
((Int, Int), (Int, Int)) => (Int, Int)
तो हम सभी को एक साथ कैसे प्राप्त कर सकते हैं?
पहले प्रत्येक विभाजन zeroValue
, seqOp
और combOp
रूप z
, seqop
और combop
respectivelly पारित कर दिया साथ मानक Iterator.aggregate
का उपयोग कर एकत्रित किया गया है।चूंकि InterruptibleIterator
आंतरिक रूप से उपयोग aggregate
पर हावी नहीं होता यह के रूप में एक सरल foldLeft(zeroValue)(seqOp)
अगला आंशिक प्रत्येक विभाजन से एकत्र परिणामों का उपयोग combOp
चलें कि इनपुट मान RDD निम्नलिखित के साथ तीन विभाजन है एकत्रित कर रहे हैं निष्पादित किया जाना चाहिए मानों का वितरण:
Iterator(1, 2)
Iterator(2, 3)
Iterator()
आपको लगता है कि निष्पादन उम्मीद कर सकते हैं, पूर्ण आदेश की अनदेखी कर, कुछ इस तरह के बराबर होगी: एक ही विभाजन के लिए
val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
.map(_.foldLeft((0, 0))(seqOp))
.reduce(combOp)
foldLeft
इस तरह दिख सकता:
Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
और सभी विभाजनों पर
Seq((3,2), (6,2), (0,0))
जो संयुक्त आप दे देंगे परिणाम मनाया:
(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
सामान्य तौर पर यह एक आम पैटर्न आप सभी स्पार्क जहां तटस्थ मान पास अधिक मिलेगा, एक समारोह विभाजन और एक समारोह प्रति मूल्यों पर कार्रवाई करने के लिए प्रयोग किया जाता विभिन्न विभाजनों से आंशिक योग को मर्ज करने के लिए प्रयोग किया जाता है। कुछ अन्य उदाहरण में शामिल हैं:
aggregateByKey
- उपयोगकर्ता निर्धारित सकल कार्य
Aggregators
स्पार्क Datasets
पर।