प्राप्त करने के लिए कैश() का उपयोग करने के लिए, क्योंकि आपका प्रश्न अस्पष्ट है, चलो सामान्य रणनीतियों के बारे में सोचें जो इस समस्या से संपर्क करने के लिए उपयोग किए जा सकते हैं।
यहां एक मानक समाधान कैशिंग होगा, लेकिन चूंकि आप स्पष्ट रूप से इसे टालना चाहते हैं, इसलिए मुझे यहां कुछ अतिरिक्त सीमाएं मानती हैं। यह पता चलता है कि मेमोरी डेटा स्टोरेज (heenenee द्वारा प्रज्वलित suggested की तरह) में कुछ इसी तरह समाधान, जैसे
तरह भंडारण त्वरित या तो स्वीकार्य नहीं हैं। इसका मतलब है कि आपको कुछ पाइपलाइन में हेरफेर करने के लिए कुछ मिलना है।
हालांकि कई परिवर्तनों को एक साथ बढ़ाया जा सकता है हर परिवर्तन एक नया आरडीडी बनाता है। यह, कैशिंग के बारे में आपके बयान के साथ संयुक्त, संभावित समाधानों पर अपेक्षाकृत मजबूत बाधाओं को सेट करता है।
चलिए सबसे सरल संभावित मामले से शुरू करते हैं जहां सभी पाइपलाइनों को एक चरण की नौकरियां व्यक्त की जा सकती हैं। यह हमारे विकल्पों को केवल नौकरियों और सरल मानचित्र को मानचित्रित करने के लिए प्रतिबंधित करता है-नौकरियों को कम करता है (जैसे आपके प्रश्न में वर्णित)। इस तरह की पाइपलाइनों को स्थानीय iterators पर संचालन के अनुक्रम के रूप में आसानी से व्यक्त किया जा सकता है। तो निम्न
import org.apache.spark.util.StatCounter
def isEven(x: Long) = x % 2 == 0
def isOdd(x: Long) = !isEven(x)
def p1(rdd: RDD[Long]) = {
rdd
.filter(isEven _)
.aggregate(StatCounter())(_ merge _, _ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.filter(isOdd _)
.reduce(_ + _)
}
व्यक्त किया जा सकता है:
def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V) = {
rdd.mapPartitions(iter => {
val items = iter.toList
Iterator((f(items.iterator), g(items.iterator)))
})
}
def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = {
rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2)))
}
def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2))
val rdd = sc.range(0L, 100L)
def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _)
evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity)
यहां सबसे बड़ी समस्या है कि बेसब्री से करने के लिए है हम है: इस प्रकार
def p1(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)))
.collect
.reduce(_ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _)))
.collect
.reduce(_ + _)
// identity _
}
इस बिंदु पर हम अलग नौकरियों के पुनर्लेखन कर सकते हैं अलग-अलग पाइपलाइनों को लागू करने में सक्षम होने के लिए प्रत्येक विभाजन का मूल्यांकन करें। इसका मतलब है कि अलग-अलग मेमोरी आवश्यकताएं अलग-अलग लागू तर्क के मुकाबले काफी अधिक हो सकती हैं। कैशिंग के बिना * बहुस्तरीय नौकरियों के मामले में यह भी बेकार है।
def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = {
rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) }
}
def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
f: T => Boolean, g: U => Boolean) = {
rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) }
}
def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X)
(s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = {
rdd.mapPartitions(iter => {
var accT = zt
var accU = zu
iter.foreach { case (ts, us) => {
accT = ts.foldLeft(accT)(s1)
accU = us.foldLeft(accU)(s2)
}}
Iterator((accT, accU))
}).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) }
}
इस तरह एपीआई के साथ हम के रूप में प्रारंभिक पाइपलाइनों व्यक्त कर सकते हैं:
val rddSeq = rdd.map(x => (Seq(x), Seq(x)))
aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
_ merge _, _ + _, _ merge _, _ + _
)
यह
एक वैकल्पिक समाधान डेटा तत्व के लिहाज से की प्रक्रिया लेकिन seqs के एक टपल के रूप में प्रत्येक आइटम के इलाज के लिए है दृष्टिकोण थोड़ा अधिक शक्तिशाली है तो पहले एक (यदि आप आवश्यक हो तो byKey
विधियों के कुछ सबसेट को आसानी से कार्यान्वित कर सकते हैं) और सामान्य पाइपलाइनों में मेमोरी आवश्यकता कोर एपीआई के बराबर होनी चाहिए लेकिन यह भी अधिक घुसपैठ कर रही है।
* आप जाँच कर सकते हैं an answer बहुसंकेतन उदाहरण के लिए eje द्वारा प्रदान की।
क्या यह पाइपलाइन सभी एक स्पार्क नौकरी के अंदर पढ़ी जाती है? या दो अलग नौकरियां? –
एक ही नौकरी (स्पार्ककॉन्टेक्स्ट) – IttayD
एक [IgniteRDD] (https://ignite.apache.org/features/igniterdd.html) का उपयोग आपके लिए काम करेगा? आप अपने डेटा को एक साझा आरडीडी में लोड कर सकते हैं, और उसके बाद दोनों पाइपलाइनें उस से काम कर सकती हैं। – heenenee