2015-12-25 16 views
13

के कारण स्टैक ओवरफ्लो मेरे पास एचडीएफएस में हजारों छोटी फाइलें हैं। फ़ाइलों के एक छोटे से छोटे सबसेट को संसाधित करने की आवश्यकता है (जो हजारों में फिर से है), फ़ाइल सूची में फ़ाइलपैथ की सूची होती है जिसे संसाधित करने की आवश्यकता होती है।लंबे आरडीडी वंश

// fileList == list of filepaths in HDFS 

var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD 

for (i <- 0 to fileList.size() - 1) { 

val filePath = fileStatus.get(i) 
val fileRDD = sparkContext.textFile(filePath) 
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line)) 

masterRDD = masterRDD.union(sampleRDD) 

} 

masterRDD.first() 

// एक बार लूप के बाहर, RDD

Exception in thread "main" java.lang.StackOverflowError 
    at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    ===================================================================== 
    ===================================================================== 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 

उत्तर

27

की लंबी वंश सामान्य तौर पर की वजह से stackoverflow त्रुटि में किसी भी कार्रवाई के परिणाम प्रदर्शन कर आप लंबे प्रजातियों को तोड़ने के लिए चौकियों उपयोग कर सकते हैं। कुछ कम या ज्यादा समान यह करने के लिए काम करना चाहिए:

import org.apache.spark.rdd.RDD 
import scala.reflect.ClassTag 

val checkpointInterval: Int = ??? 

def loadAndFilter(path: String) = sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _)) 

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) 
    (acc: RDD[T], xi: (RDD[T], Int)) = { 
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint 
    else xi._1.union(acc) 
    } 

val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] 
fileList.map(loadAndFilter).zipWithIndex 
    .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval)) 

इस विशेष स्थिति में एक बहुत सरल समाधान SparkContext.union विधि का उपयोग किया जाना चाहिए:

val masterRDD = sc.union(
    fileList.map(path => sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _))) 
) 

इन तरीकों के बीच एक अंतर स्पष्ट किया जाना चाहिए जब आप ले DAG पर एक नज़र पाश द्वारा उत्पन्न/reduce:

enter image description here

और एक एकल union:

enter image description here

बेशक

यदि फ़ाइलें छोटे आप flatMap साथ wholeTextFiles गठबंधन और एक बार में सभी फ़ाइलों को पढ़ सकता है:

sc.wholeTextFiles(fileList.mkString(",")) 
    .flatMap{case (path, text) => 
    text.split("\n").filter(_.startsWith("#####")).map((path, _))} 
+4

बेस्ट कभी sc.union का उपयोग करें() –

संबंधित मुद्दे