2014-11-05 8 views
11

हमारे पास एक बहुत ही मानक स्पार्क नौकरी है जो एस 3 से लॉग फाइलें पढ़ती है और फिर कुछ प्रसंस्करण करता है। बहुत ही बुनियादी स्पार्क सामान ...क्या मानचित्र के दौरान स्पार्क में रिकॉर्ड्स को छोड़ने/फेंकने/अनदेखा करने का कोई तरीका है?

val logs = sc.textFile(somePathTos3) 
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t"))) 
val validRows = mappedRows.filter(log => log._1._1 != "ERROR") 
...and continue processing 

कहाँ OurRowObject.parseLine कच्चे लॉग पंक्ति लेता है और कुछ (कुंजी, मूल्य) जोड़ी के लिए यह नक्शे (जैसे ((1,2,3,4), (5,6,7)) है कि हम तो पर प्रसंस्करण कर सकते हैं। अब, अगर parseLine एक का सामना करना पड़ता " समस्या "लॉग (विकृत, खाली, इत्यादि ...) यह कुछ सेंटीनेल मान लौटाएगा (उदाहरण के लिए (("ERROR", ...), (...)) जो फ़िल्टर चरण फिर फ़िल्टर करता है।

अब, मैं जो करने का प्रयास ढूंढ रहा हूं वह बस है मानचित्र के दौरान समस्या पंक्ति शामिल नहीं है ... स्पार्क को बताने का कोई तरीका "अरे यह एक खाली/विकृत पंक्ति है, इसे छोड़ दें और इसके लिए एक जोड़ी शामिल न करें", उस अतिरिक्त फ़िल्टर चरण के बजाय।

मैं अभी तक ऐसा करने का कोई तरीका नहीं ढूंढ पाया है, और यह बहुत दिलचस्प लगता है कि यह कार्यक्षमता (AFAICANFind) मौजूद नहीं है।

आप

उत्तर

9

आप पार्सर एक विकल्प [मान] के बजाय एक मूल्य वापसी कर सकता है धन्यवाद। इस तरह आप पंक्तियों को लाइनों को मैप करने और अमान्य लोगों को हटाने के लिए flatMap का उपयोग कर सकते हैं।

किसी न किसी तरह लाइनों में कुछ इस तरह:

def parseLog(line:String):Option[Array[String]] = { 
    val splitted = log.split("\t") 
    if (validate(splitted)) Some(splitted) else None 
} 

val validRows = logs.flatMap(OurRowObject.parseLog(_)) 
+0

दिलचस्प विचार कॉल कर सकते हैं, मैं इसे एक शॉट दूंगा। धन्यवाद! –

+0

जबकि मुझे वास्तव में यह समाधान पसंद है, ऐसा लगता है कि आपको 'संग्रह' के एक पैरामीटर ओवरलोड ('map' या' flatMap' के बजाय) और 'आंशिक फ़ंक्शन' का उपयोग करके समान प्रभाव प्राप्त करने में सक्षम होना चाहिए। मैं काम करने वाले एक छोटे से उदाहरण से ज्यादा कुछ हासिल नहीं कर पाया है, लेकिन आपको यह पता लगाना भी एक मजेदार संभावना है। यह 'विकल्प [मूल्य]' दृष्टिकोण का उपयोग करना बहुत आसान लगता है। (मुझे अपना कोड साझा करने में खुशी है लेकिन यह पोस्ट नहीं करेगा क्योंकि यह काफी काम नहीं करता है।) –

+0

@SpiroMichaylov यह सही है, 'संग्रह (पीएफ [ए, बी])' को इस तरह के फ़िल्टरिंग करने के लिए भी काम करना चाहिए । 'संग्रह 'के बारे में एक बात स्पार्क है जो मुझे पसंद नहीं है कि पैरामीटर के बिना' संग्रह 'ड्राइवर को सभी डेटा प्राप्त करना है, जबकि' संग्रह (पीएफ) 'एक परिवर्तन है, जो लोगों के लिए भ्रमित हो जाता है। क्या आप इसे उत्तर के रूप में जोड़ सकते हैं? यह एक बहुत ही वैध विकल्प है। – maasg

5

एक दृष्टिकोण है और एक PartialFunction (map या flatMap के बजाय) collect में से एक पैरामीटर अधिभार उपयोग करने के लिए है। यह थोड़ा मुश्किल है यदि आपको आंशिक कार्य की आवश्यकता पूरी तरह से तुच्छ नहीं है। असल में आपका शायद ऐसा नहीं होगा क्योंकि आपको और मान्य करने की आवश्यकता है, जिसे मैं नीचे दो आंशिक कार्यों के साथ मॉडल करूँगा (हालांकि पहले इनपुट को सभी इनपुट के लिए परिभाषित किया जाता है)।

// this doesn't really need to be a partial function but we'll 
// want to compose it with one and end up with a partial function 
val split: PartialFunction[String, Array[String]] = { 
    case log => log.split("\t") 
} 

// this really needs to be a partial function 
val validate: PartialFunction[Array[String], Array[String]] = { 
    case lines if lines.length > 2 => lines 
} 

val splitAndValidate = split andThen validate 

val logs = sc.parallelize(Seq("a\tb", "u\tv\tw", "a", "x\ty\tz"), 4) 

// only accept the logs with more than two entries 
val validRows = logs.collect(splitAndValidate) 

यह पूरी तरह से अच्छा है स्काला लेकिन यह काम नहीं करता क्योंकि splitAndValidate serializable नहीं है और हम स्पार्क का उपयोग कर रहे हैं। (ध्यान दें कि split और validate serializable हैं: समस्या रचना के साथ निहित है) तो, हम एक PartialFunction कि serializable है बनाने की जरूरत है:

class LogValidator extends PartialFunction[String, Array[String]] with Serializable { 

    private val validate: PartialFunction[Array[String], Array[String]] = { 
    case lines if lines.length > 2 => lines 
    } 

    override def apply(log: String) : Array[String] = { 
    validate(log.split("\t")) 
    } 

    override def isDefinedAt(log: String) : Boolean = { 
    validate.isDefinedAt(log.split("\t")) 
    } 

} 

फिर हम

val validRows = logs.collect(new LogValidator()) 
संबंधित मुद्दे

 संबंधित मुद्दे