मैं स्पार्क 1.6.1 का उपयोग कर रहा हूं और एक अजीब व्यवहार का सामना कर रहा हूं: मैं कुछ भारी कंप्यूटेशंस (भौतिकी सिमुलेशन) के साथ एक डेटाफ्रेम पर यूडीएफ चला रहा हूं कुछ इनपुट डेटा, और एक परिणाम का निर्माण - डेटाफ्रेम जिसमें कई कॉलम होते हैं (~ 40)।स्पार्क यूडीएफ ने प्रति रिकॉर्ड एक से अधिक बार बुलाया जब डीएफ में बहुत से कॉलम
आश्चर्यजनक रूप से, मेरे यूडीएफ को इस मामले में मेरे इनपुट डेटाफ्रेम के प्रति रिकॉर्ड से अधिक बार कहा जाता है (1.6 गुना अधिक), जो मुझे अस्वीकार्य लगता है क्योंकि यह बहुत महंगा है। यदि मैं कॉलम की संख्या को कम करता हूं (उदाहरण के लिए 20), तो यह व्यवहार गायब हो जाता है।
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo {
case class Result(a: Double)
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val numRuns = sc.accumulator(0) // to count the number of udf calls
val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})
val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")
// get results of UDF
var results = data
.withColumn("tmp", myUdf($"id"))
.withColumn("result", $"tmp.a")
// add many columns to dataframe (must depend on the UDF's result)
for (i <- 1 to 42) {
results=results.withColumn(s"col_$i",$"result")
}
// trigger action
val res = results.collect()
println(res.size) // prints 100
println(numRuns.value) // prints 160
}
}
अब, वहाँ स्तंभों की संख्या कम करने के बिना इस समस्या के समाधान के लिए एक रास्ता है:
मैं नीचे एक छोटा सा स्क्रिप्ट जो इस दर्शाता है लिखने में सफल?
यह वास्तव में काम करता है! मैं अभी भी जवाब स्वीकार करने के साथ इंतजार कर रहा हूं, शायद किसी के पास एक व्यापक उत्तर –
हाँ, मैं भी उत्सुक हूं - आप स्वीकार करने के साथ बिल्कुल ठीक है :) –