2017-11-04 39 views
5

मुझे स्पार्क एप्लिकेशन को निष्पादित करने में कोई समस्या है।यह स्पार्क कोड NullPointerException क्यों बनाता है?

स्रोत कोड:

// Read table From HDFS 
val productInformation = spark.table("temp.temp_table1") 
val dict = spark.table("temp.temp_table2") 

// Custom UDF 
val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => 
    dict.filter(
     (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7 
    ).count 
) 

val result = productInformation.withColumn("positive_count", countPositiveSimilarity($"title", $"internal_category")) 

// Error occurs! 
result.show 

त्रुटि संदेश:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 54.0 failed 4 times, most recent failure: Lost task 0.3 in stage 54.0 (TID 5887, ip-10-211-220-33.ap-northeast-2.compute.internal, executor 150): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.NullPointerException 
    at $anonfun$1.apply(<console>:45) 
    at $anonfun$1.apply(<console>:43) 
    ... 16 more 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:636) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:595) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:604) 
    ... 48 elided 
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    ... 3 more 
Caused by: java.lang.NullPointerException 
    at $anonfun$1.apply(<console>:45) 
    at $anonfun$1.apply(<console>:43) 
    ... 16 more 

मैं जाँच की है productInformation और dictColumns में शून्य मान है या नहीं। लेकिन कोई शून्य मूल्य नहीं हैं।

क्या कोई मेरी मदद कर सकता है? मैं आपको और अधिक जानकारी बताने के लिए उदाहरण कोड संलग्न:

case class Target(wordListOne: Seq[String], WordListTwo: Seq[String]) 
val targetData = Seq(Target(Seq("Spark", "Wrong", "Something"), Seq("Java", "Grape", "Banana")), 
        Target(Seq("Java", "Scala"), Seq("Scala", "Banana")), 
        Target(Seq(""), Seq("Grape", "Banana")), 
        Target(Seq(""), Seq(""))) 
val targets = spark.createDataset(targetData) 

case class WordSimilarity(first: String, second: String, similarity: Double) 
val similarityData = Seq(WordSimilarity("Spark", "Java", 0.8), 
        WordSimilarity("Scala", "Spark", 0.9), 
        WordSimilarity("Java", "Scala", 0.9), 
        WordSimilarity("Apple", "Grape", 0.66), 
        WordSimilarity("Scala", "Apple", -0.1), 
        WordSimilarity("Gine", "Spark", 0.1)) 
val dict = spark.createDataset(similarityData) 

val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => 
    dict.filter(
     (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7 
    ).count 
) 

val countDF = targets.withColumn("positive_count", countPositiveSimilarity($"wordListOne", $"wordListTwo")) 

यह एक उदाहरण कोड है और अपने मूल कोड के समान है। उदाहरण कोड अच्छी तरह से संचालित है। मूल बिंदु और डेटा में मुझे किस बिंदु की जांच करनी चाहिए?

उत्तर

8

बहुत ही रोचक सवाल। मुझे कुछ खोज करना है, और यहां मेरा है। उम्मीद है कि इससे आपको थोड़ा सा मदद मिलेगी।

जब आप DatasetcreateDataset के माध्यम से बनाते हैं, तो स्पार्क LocalRelation लॉजिकल क्वेरी प्लान के साथ इस डेटासेट को असाइन करेगा।

def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = { 
    val enc = encoderFor[T] 
    val attributes = enc.schema.toAttributes 
    val encoded = data.map(d => enc.toRow(d).copy()) 
    val plan = new LocalRelation(attributes, encoded) 
    Dataset[T](self, plan) 
    } 

इस link का पालन करें: LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, i.e. without using Spark executors.

और, यह सच है के रूप में isLocal विधि जाहिर

/** 
    * Returns true if the `collect` and `take` methods can be run locally 
    * (without any Spark executors). 
    * 
    * @group basic 
    * @since 1.6.0 
    */ 
    def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation] 

का कहना है, आप अपने 2 डेटासेट की जांच कर सकते स्थानीय है।

और, show विधि वास्तव में take आंतरिक रूप से कॉल करें।

private[sql] def showString(_numRows: Int, truncate: Int = 20): String = { 
    val numRows = _numRows.max(0) 
    val takeResult = toDF().take(numRows + 1) 
    val hasMoreData = takeResult.length > numRows 
    val data = takeResult.take(numRows) 

तो, उन envidences साथ, मुझे लगता है कि कॉल countDF.show निष्पादित किया जाता है, यह के रूप में जब आप ड्राइवर से dict डेटासेट पर count फोन simliar व्यवहार करेंगे, कॉल समय की संख्या targets के अभिलेखों की संख्या है। और, dict पाठ्यक्रम के डेटासेट को countDF काम पर शो के लिए स्थानीय होने की आवश्यकता नहीं है।

आप countDF को बचाने की कोशिश कर सकते हैं, तो यह आपको अपवाद पहले मामले के रूप में ही दे देंगे org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)

+0

में अपने उत्तर के लिए धन्यवाद। मेरे पास दो प्रश्न हैं। 1) आपका मुख्य बिंदु यह है कि, दूसरे शब्दों में, जब मैं सहेजता हूं या विधि दिखाता हूं तो यह उदाहरण कोड में एक त्रुटि होगी?मैं प्रवेश स्तर स्पार्क उपयोगकर्ता हूं, इसलिए मुझे लगता है कि यह एक मूर्ख सवाल हो सकता है। 2) इसके अलावा, udf में डेटाफ्रेम का उपयोग करने का कोई तरीका नहीं है? – Ashe

+0

1. आपका पहला मामला, दोनों असफल हो जाएंगे। आपका दूसरा मामला, बचाएगा असफल हो जाएगा। 2. डेटासेट udf में हो सकता है, यानी बस ठीक है। लेकिन यह आपके पहले मामले की तरह रनटाइम पर असफल हो जाएगा यदि कॉलिंग डेटासेट स्थानीय नहीं है और कार्रवाई 'ले' या 'संग्रह' नहीं है। – nabongs

3

आप udf के अंदर Dataframe का उपयोग नहीं कर सकते। आपको productInformation और dict में शामिल होने की आवश्यकता होगी, और शामिल होने के बाद udf तर्क करें।

+0

यह अपरिवर्तनीय मानचित्र यूडीएफ में dict (DataFrame) द्वारा परिवर्तित उपयोग करने के लिए संभव है? वर्तमान में, उत्पाद जानकारी और निर्देश में शामिल होना मुश्किल है। – Ashe

+1

यदि 'dict' df बहुत बड़ा नहीं है, तो आप इसे एकत्र कर सकते हैं, 'मानचित्र' में परिवर्तित कर सकते हैं, और उस मानचित्र का उपयोग udf – lev

संबंधित मुद्दे