2015-07-08 19 views
5

प्रसारण के माध्यम से साझा किए गए चर क्लस्टर में शून्य हैं।स्पार्क प्रसारित परिवर्तनीय रिटर्न NullPointerException जब अमेज़ॅन ईएमआर क्लस्टर

मेरा आवेदन काफी जटिल है, लेकिन मैं यह छोटा सा उदाहरण है कि दोषरहित काम करता है जब मैं इसे स्थानीय स्तर पर चलाने के लिए लिखा है, लेकिन यह क्लस्टर में विफल रहता है:

package com.gonzalopezzi.bigdata.bicing 

import org.apache.spark.broadcast.Broadcast 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkContext, SparkConf} 

object PruebaBroadcast2 extends App { 
    val conf = new SparkConf().setAppName("PruebaBroadcast2") 
    val sc = new SparkContext(conf) 

    val arr : Array[Int] = (6 to 9).toArray 
    val broadcasted = sc.broadcast(arr) 

    val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines 
    rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) // NullPointerException in the flatmap. broadcasted is null 

} 

मैं अगर समस्या एक है पता नहीं है कोडिंग त्रुटि या कॉन्फ़िगरेशन समस्या।

यह है स्टैकट्रेस मैं:

15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException 
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24) 
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Command exiting with ret '1' 

किसी को भी मदद कर सकते हैं मुझे इसे ठीक? कम से कम, क्या आप मुझे बता सकते हैं कि क्या आप कोड में कुछ अजीब देखते हैं? यदि आपको लगता है कि कोड ठीक है, तो कृपया मुझे बताएं, क्योंकि इसका मतलब यह होगा कि समस्या क्लस्टर की कॉन्फ़िगरेशन में है।

अग्रिम धन्यवाद।

उत्तर

5

अंततः मुझे यह काम मिल गया।

यह घोषित करने से काम नहीं करता इस तरह वस्तु:

object MyObject extends App { 

लेकिन यह, काम करता है अगर आप एक मुख्य कार्य के साथ एक वस्तु की घोषणा: में

object MyObject { 
    def main (args : Array[String]) { 
    /* ... */ 
    } 
} 

तो, कम उदाहरण प्रश्न काम करता है अगर मैं इसे इस तरह से फिर से लिखने:

object PruebaBroadcast2 { 

    def main (args: Array[String]) { 
    val conf = new SparkConf().setAppName("PruebaBroadcast2") 
    val sc = new SparkContext(conf) 

    val arr : Array[Int] = (6 to 9).toArray 
    val broadcasted = sc.broadcast(arr) 

    val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) 

    rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) 
    } 
} 

यह समस्या इस बग से संबंधित लगता है: https://issues.apache.org/jira/browse/SPARK-4170

+1

बग "निश्चित" कहता है, लेकिन मुझे अभी भी एक ही समस्या (सीडीएच 5.5.2) – Havnar

+0

में भागना प्रतीत होता है। बग "निश्चित" कहता है लेकिन फिक्स सिर्फ एक चेतावनी प्रिंट करता है: "scala.App के सबक्लास सही ढंग से काम नहीं करते हैं। इसके बजाए एक मुख्य() विधि का प्रयोग करें। " –

+0

यह एक चाल है, लेकिन सौंदर्यशास्त्र से मुझे थोड़ा और अधिक प्रसन्नता है, आप ऑब्जेक्ट कर सकते हैं 'PruebaBroadcast2 ऐप {{/ * आपका कोड * /}} बढ़ाता है' – lyomi

संबंधित मुद्दे