2015-11-10 7 views
7

मैं एवरो फाइल वाले कुछ जेनरेट किए गए एस 3 पथों पर स्पार्क जॉब (स्पार्क v1.5.1) चलाने के लिए चाहता हूं। मैं उन्हें लोड कर रहा हूं:स्पार्क को गायब इनपुट फ़ाइलों को अनदेखा करने की अनुमति कैसे दें?

val avros = paths.map(p => sqlContext.read.avro(p)) 

हालांकि कुछ पथ मौजूद नहीं होंगे। मैं उन खाली पथों को अनदेखा करने के लिए स्पार्क कैसे प्राप्त कर सकता हूं? पहले मैंने this answer का उपयोग किया है, लेकिन मुझे यकीन नहीं है कि नए डेटाफ्रेम एपीआई के साथ इसका उपयोग कैसे करें।

नोट: मैं आदर्श रूप से लिंक किए गए उत्तर के समान दृष्टिकोण की तलाश कर रहा हूं जो केवल इनपुट पथ वैकल्पिक बनाता है। मैं विशेष रूप से एस 3 में पथों के अस्तित्व की जांच नहीं करना चाहता (क्योंकि यह बोझिल है और विकास को अजीब बना सकता है), लेकिन मुझे लगता है कि अगर यह अभी लागू करने के लिए कोई साफ तरीका नहीं है तो मेरा फॉलबैक है।

उत्तर

9

मैं avro फ़ाइलों की निर्देशिका पढ़ने में विफलता की संभावना को संभालने के लिए स्केल Try प्रकार का उपयोग करूंगा। साथ 'प्रयास करें' हम अपने कोड में स्पष्ट विफलता की संभावना कर सकते हैं और एक कार्यात्मक तरीके से इसे संभाल:

object Main extends App { 

    import scala.util.{Success, Try} 
    import org.apache.spark.{SparkConf, SparkContext} 
    import com.databricks.spark.avro._ 

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    //the first path exists, the second one doesn't 
    val paths = List("/data/1", "/data/2") 

    //Wrap the attempt to read the paths in a Try, then use collect to filter 
    //and map with a single partial function. 
    val avros = 
    paths 
     .map(p => Try(sqlContext.read.avro(p))) 
     .collect{ 
     case Success(df) => df 
     } 
    //Do whatever you want with your list of dataframes 
    avros.foreach{ df => 
    println(df.collect()) 
    } 
    sc.stop() 
} 
+0

चिंगारी डॉक्स से: 'इकट्ठा()' ड्राइवर स्मृति से बाहर चलाने के लिए पैदा कर सकता, हालांकि, क्योंकि 'संग्रह()' पूरे आरडीडी को एक मशीन में लाता है। क्या 'संग्रह() 'का उपयोग किए बिना कोई समाधान है? यह एक बहुत बड़े डेटासेट के लिए है। – jbrown

+2

यह सच है जब आरडीडी पर 'संग्रह() 'कहा जाता है। जहां मैं पहली बार 'संग्रह (...)' कहता हूं, जिसमें आंशिक कार्य होता है, यह आरडीडी की सूची में है, यह किसी भी आरडीडी पर नहीं, सूची में संग्रह समारोह है। यह 'मानचित्र' और 'फ़िल्टर' करने के बराबर है। मैं अंत में 'foreach' के अंदर अंत में 'संग्रह()' का उपयोग करता हूं लेकिन यह आरडीडी की सूची का संचालन करने का एक उदाहरण है, मुझे उम्मीद नहीं है कि आप अपने आवेदन में क्या करेंगे, लेकिन मैं यह देखने के लिए एक सरल अंत की आवश्यकता है कि दृष्टिकोण सही तरीके से काम करता था। – mattinbits

+0

ओह ठीक है। मैं इसे आज़मा दूंगा और देख सकता हूं कि यह तब काम करता है या नहीं। मैंने सोचा कि पहला 'संग्रह' आरडीडी का मूल्यांकन कर रहा था और सभी डेटा ड्राइवर नोड को भेज रहा था। – jbrown

संबंधित मुद्दे