2015-05-25 10 views
6

RDD करने के लिए मैं बहुत नया हूँ और स्केला भाषा और संघ के रूप में नीचे एक सूची में सभी RDDs (List<RDD> to RDD) चाहते हैं:स्पार्क: चिंगारी कैसे एक सूची <RDD> संघ

val data = for (item <- paths) yield { 
     val ad_data_path = item._1 
     val ad_data = SparkCommon.sc.textFile(ad_data_path).map { 
      line => { 
       val ad_data = new AdData(line) 
       (ad_data.ad_id, ad_data) 
      } 
     }.distinct() 
    } 
val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _) 

मैं कोड को चलाने IntelliJ में हमेशा एक त्रुटि प्राप्त होती है:

ava.lang.NullPointerException 
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) 
at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59) 
at org.apache.spark.rdd.RDD.union(RDD.scala:438) 
at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) 
at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

किसी को भी त्रुटि के बारे में कोई जानकारी है? अग्रिम धन्यवाद :)

उत्तर

17

में यह कारण हो सकता है,

val listA = 1 to 10 
for(i <- listA; if i%2 == 0)yield {i} 

लौट वेक्टर (2,4,6,8,10), जबकि

for(i <- listA; if i%2 == 0)yield {val c = i} 

वेक्टर वापस आ जाएगी होगा ((),(),(),(),())

वही है जो आपके मामले में हो रहा है। आप ad_data प्रारंभ कर रहे हैं लेकिन इसे वापस उपज पर वापस नहीं कर रहे हैं।

जहां तक ​​आपके प्रश्न का संबंध है, यानी की सूची [RDD] RDD

यहाँ

का हल है:

val listA = sc.parallelize(1 to 10) 
val listB = sc.parallelize(10 to 1 by -1) 

2 RDDS

val listC = List(listA,listB) 
की सूची बनाने

परिवर्तित करें की सूची [RDD] RDD

val listD = listC.reduce(_ union _) 

आशा के लिए, यह आपके सवाल का जवाब।

+0

धन्यवाद एक बहुत, समस्या आपके समाधान के साथ हल हो जाती है। – juffun

+0

@juffun, अगर आप समाधान के लिए काम करते हैं, तो आप उत्तर स्वीकार कर सकते हैं :) – Akash

+0

यकीन है, पहले ही स्वीकार कर लिया गया है। – juffun

0

आरडीडी की आरडीडी की सूची से परिवर्तित करने के लिए एक और आसान तरीका। SparkContext दो अतिभारित संघ तरीकों है, एक दो RDDs स्वीकार करता है और अन्य

संघ (प्रथम, बाकी) RDDs की सूची को स्वीकार करता है संघ (rdds: Seq [RDD [टी]]))

संबंधित मुद्दे