2016-05-24 11 views
10

E.g पर एकाधिक पाइपलाइनों के साथ अधिक आईओ को रोकें। यदि मैं संख्याओं के समान आरडीडी पर चलाता हूं जहां एक प्रवाह भी संख्याओं के लिए फ़िल्टर करता है और उन्हें और अन्य फ़िल्टरों को अजीब और उन्हें फ़िल्टर करता है। यदि मैं इसे एक ही आरडीडी पर दो पाइपलाइनों के रूप में लिखता हूं तो यह दो निष्पादन बनाएगा, जो आरडीडी को दो बार स्कैन करेगा, जो आईओ के मामले में महंगा हो सकता है।एक ही आरडीडी

इस आईओ को एक पाइपलाइन में तर्क को दोबारा लिखने के बिना केवल डेटा को पढ़ने के लिए कैसे कम किया जा सकता है? एक ढांचा जो दो पाइपलाइन लेता है और उन्हें एक विलय करता है, ठीक है, जब तक कि डेवलपर्स प्रत्येक पाइपलाइन पर स्वतंत्र रूप से काम करते रहें (वास्तविक मामले में, इन पाइपलाइनों को अलग मॉड्यूल से लोड किया जाता है)

बिंदु नहीं है इस

+0

क्या यह पाइपलाइन सभी एक स्पार्क नौकरी के अंदर पढ़ी जाती है? या दो अलग नौकरियां? –

+0

एक ही नौकरी (स्पार्ककॉन्टेक्स्ट) – IttayD

+0

एक [IgniteRDD] (https://ignite.apache.org/features/igniterdd.html) का उपयोग आपके लिए काम करेगा? आप अपने डेटा को एक साझा आरडीडी में लोड कर सकते हैं, और उसके बाद दोनों पाइपलाइनें उस से काम कर सकती हैं। – heenenee

उत्तर

3

प्राप्त करने के लिए कैश() का उपयोग करने के लिए, क्योंकि आपका प्रश्न अस्पष्ट है, चलो सामान्य रणनीतियों के बारे में सोचें जो इस समस्या से संपर्क करने के लिए उपयोग किए जा सकते हैं।

यहां एक मानक समाधान कैशिंग होगा, लेकिन चूंकि आप स्पष्ट रूप से इसे टालना चाहते हैं, इसलिए मुझे यहां कुछ अतिरिक्त सीमाएं मानती हैं। यह पता चलता है कि मेमोरी डेटा स्टोरेज (heenenee द्वारा प्रज्वलित suggested की तरह) में कुछ इसी तरह समाधान, जैसे

तरह भंडारण त्वरित या तो स्वीकार्य नहीं हैं। इसका मतलब है कि आपको कुछ पाइपलाइन में हेरफेर करने के लिए कुछ मिलना है।

हालांकि कई परिवर्तनों को एक साथ बढ़ाया जा सकता है हर परिवर्तन एक नया आरडीडी बनाता है। यह, कैशिंग के बारे में आपके बयान के साथ संयुक्त, संभावित समाधानों पर अपेक्षाकृत मजबूत बाधाओं को सेट करता है।

चलिए सबसे सरल संभावित मामले से शुरू करते हैं जहां सभी पाइपलाइनों को एक चरण की नौकरियां व्यक्त की जा सकती हैं। यह हमारे विकल्पों को केवल नौकरियों और सरल मानचित्र को मानचित्रित करने के लिए प्रतिबंधित करता है-नौकरियों को कम करता है (जैसे आपके प्रश्न में वर्णित)। इस तरह की पाइपलाइनों को स्थानीय iterators पर संचालन के अनुक्रम के रूप में आसानी से व्यक्त किया जा सकता है। तो निम्न

import org.apache.spark.util.StatCounter 

def isEven(x: Long) = x % 2 == 0 
def isOdd(x: Long) = !isEven(x) 

def p1(rdd: RDD[Long]) = { 
    rdd 
    .filter(isEven _) 
    .aggregate(StatCounter())(_ merge _, _ merge _) 
    .mean 
} 

def p2(rdd: RDD[Long]) = { 
    rdd 
    .filter(isOdd _) 
    .reduce(_ + _) 
} 

व्यक्त किया जा सकता है:

def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V) = { 
    rdd.mapPartitions(iter => { 
    val items = iter.toList 
    Iterator((f(items.iterator), g(items.iterator))) 
    }) 
} 

def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = { 
    rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2))) 
} 

def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2)) 

val rdd = sc.range(0L, 100L) 

def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _) 
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _) 


evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity) 

यहां सबसे बड़ी समस्या है कि बेसब्री से करने के लिए है हम है: इस प्रकार

def p1(rdd: RDD[Long]) = { 
    rdd 
    .mapPartitions(iter => 
     Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _))) 
    .collect 
    .reduce(_ merge _) 
    .mean 
} 


def p2(rdd: RDD[Long]) = { 
    rdd 
    .mapPartitions(iter => 
     Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _))) 
    .collect 
    .reduce(_ + _) 
    // identity _ 
} 

इस बिंदु पर हम अलग नौकरियों के पुनर्लेखन कर सकते हैं अलग-अलग पाइपलाइनों को लागू करने में सक्षम होने के लिए प्रत्येक विभाजन का मूल्यांकन करें। इसका मतलब है कि अलग-अलग मेमोरी आवश्यकताएं अलग-अलग लागू तर्क के मुकाबले काफी अधिक हो सकती हैं। कैशिंग के बिना * बहुस्तरीय नौकरियों के मामले में यह भी बेकार है।

def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = { 
    rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) } 
} 


def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
    f: T => Boolean, g: U => Boolean) = { 
    rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) } 
} 


def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X) 
    (s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = { 
    rdd.mapPartitions(iter => { 
    var accT = zt 
    var accU = zu 
    iter.foreach { case (ts, us) => { 
     accT = ts.foldLeft(accT)(s1) 
     accU = us.foldLeft(accU)(s2) 
    }} 

    Iterator((accT, accU)) 
    }).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) } 
} 

इस तरह एपीआई के साथ हम के रूप में प्रारंभिक पाइपलाइनों व्यक्त कर सकते हैं:

val rddSeq = rdd.map(x => (Seq(x), Seq(x))) 


aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
    _ merge _, _ + _, _ merge _, _ + _ 
) 

यह

एक वैकल्पिक समाधान डेटा तत्व के लिहाज से की प्रक्रिया लेकिन seqs के एक टपल के रूप में प्रत्येक आइटम के इलाज के लिए है दृष्टिकोण थोड़ा अधिक शक्तिशाली है तो पहले एक (यदि आप आवश्यक हो तो byKey विधियों के कुछ सबसेट को आसानी से कार्यान्वित कर सकते हैं) और सामान्य पाइपलाइनों में मेमोरी आवश्यकता कोर एपीआई के बराबर होनी चाहिए लेकिन यह भी अधिक घुसपैठ कर रही है।


* आप जाँच कर सकते हैं an answer बहुसंकेतन उदाहरण के लिए eje द्वारा प्रदान की।

संबंधित मुद्दे