2016-10-29 9 views
6

मैं स्पार्क 1.6.1 का उपयोग कर रहा हूं और एक अजीब व्यवहार का सामना कर रहा हूं: मैं कुछ भारी कंप्यूटेशंस (भौतिकी सिमुलेशन) के साथ एक डेटाफ्रेम पर यूडीएफ चला रहा हूं कुछ इनपुट डेटा, और एक परिणाम का निर्माण - डेटाफ्रेम जिसमें कई कॉलम होते हैं (~ 40)।स्पार्क यूडीएफ ने प्रति रिकॉर्ड एक से अधिक बार बुलाया जब डीएफ में बहुत से कॉलम

आश्चर्यजनक रूप से, मेरे यूडीएफ को इस मामले में मेरे इनपुट डेटाफ्रेम के प्रति रिकॉर्ड से अधिक बार कहा जाता है (1.6 गुना अधिक), जो मुझे अस्वीकार्य लगता है क्योंकि यह बहुत महंगा है। यदि मैं कॉलम की संख्या को कम करता हूं (उदाहरण के लिए 20), तो यह व्यवहार गायब हो जाता है।

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.functions.udf 


object Demo { 

    case class Result(a: Double) 

    def main(args: Array[String]): Unit = { 

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]")) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val numRuns = sc.accumulator(0) // to count the number of udf calls 

    val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)}) 

    val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id") 

    // get results of UDF 
    var results = data 
     .withColumn("tmp", myUdf($"id")) 
     .withColumn("result", $"tmp.a") 


    // add many columns to dataframe (must depend on the UDF's result) 
    for (i <- 1 to 42) { 
     results=results.withColumn(s"col_$i",$"result") 
    } 

    // trigger action 
    val res = results.collect() 
    println(res.size) // prints 100 

    println(numRuns.value) // prints 160 

    } 
} 

अब, वहाँ स्तंभों की संख्या कम करने के बिना इस समस्या के समाधान के लिए एक रास्ता है:

मैं नीचे एक छोटा सा स्क्रिप्ट जो इस दर्शाता है लिखने में सफल?

उत्तर

4

मैं वास्तव में इस व्यवहार को समझा नहीं सकता - लेकिन जाहिर है कि क्वेरी योजना किसी भी तरह से पथ चुनती है जहां कुछ रिकॉर्ड दो बार गणना की जाती हैं। इसका मतलब है कि अगर हम कैश इंटरमीडिएट परिणाम (यूडीएफ लागू करने के ठीक बाद) हम स्पार्क को "बल" करने में सक्षम हो सकते हैं ताकि यूडीएफ को दोबारा नहीं बदला जा सके। और वास्तव में, एक बार कैशिंग जोड़ा जाता है यह उम्मीद के रूप में व्यवहार - यूडीएफ वास्तव में 100 बार कहा जाता है:

// get results of UDF 
var results = data 
    .withColumn("tmp", myUdf($"id")) 
    .withColumn("result", $"tmp.a").cache() 
बेशक

, कैशिंग अपनी ही लागत (स्मृति ...) है, लेकिन यह आपके मामले में फायदेमंद खत्म हो सकता है अगर यह कई यूडीएफ कॉल बचाता है।

+0

यह वास्तव में काम करता है! मैं अभी भी जवाब स्वीकार करने के साथ इंतजार कर रहा हूं, शायद किसी के पास एक व्यापक उत्तर –

+0

हाँ, मैं भी उत्सुक हूं - आप स्वीकार करने के साथ बिल्कुल ठीक है :) –

4

हमें एक साल पहले भी यही समस्या थी और आखिरकार हमने यह पता लगाया कि समस्या क्या थी।

हमारे पास गणना करने के लिए एक बहुत महंगा यूडीएफ भी था और हमने पाया कि हर बार जब हम इसके कॉलम का संदर्भ लेते हैं तो यह बार-बार गणना की जाती है। इसका बस फिर कुछ दिन पहले हमारे लिए हुआ है, इसलिए मैं इस पर एक बग खोलने का निर्णय लिया: SPARK-18748

हम यह भी एक सवाल यहाँ तो खोल दिया, लेकिन अब मैं देख रहा हूँ शीर्षक इतना अच्छा नहीं था: Trying to turn a blob into multiple columns in Spark

मैं किसी भी तरह से यूडीएफ की गणना करने की योजना को "मजबूर" करने के बारे में Tzach से सहमत हूं। हम इसे भद्दा था, लेकिन हम किया था, क्योंकि हम कर सकते थे नहीं कैश() डेटा - यह बहुत बड़ा था:

val df = data.withColumn("tmp", myUdf($"id")) 
val results = sqlContext.createDataFrame(df.rdd, df.schema) 
      .withColumn("result", $"tmp.a") 

अद्यतन:

अब मुझे लगता है कि मेरी jira टिकट दूसरे करने के लिए जोड़ा गया था एक: SPARK-17728, जो अभी भी वास्तव में इस मुद्दे को सही तरीके से संभाल नहीं था, लेकिन यह एक और वैकल्पिक काम के आसपास देता है:

val results = data.withColumn("tmp", explode(array(myUdf($"id")))) 
        .withColumn("result", $"tmp.a") 
+0

साझा करने के लिए धन्यवाद! – twoface88

संबंधित मुद्दे