2017-03-08 10 views
5

में बदल रहे हैं मैं स्पार्क में udf में एक स्ट्रक्चर पास करने की कोशिश कर रहा हूं। यह फ़ील्ड नाम बदल रहा है और कॉलम स्थिति में नाम बदल रहा है। मैं इसे कैसे ठीक करूं?स्पार्क स्ट्रक्चर स्ट्रक्चरफील्ड नाम यूडीएफ

object TestCSV { 

      def main(args: Array[String]) { 

      val conf = new SparkConf().setAppName("localTest").setMaster("local") 
      val sc = new SparkContext(conf) 
      val sqlContext = new SQLContext(sc) 


      val inputData = sqlContext.read.format("com.databricks.spark.csv") 
        .option("delimiter","|") 
        .option("header", "true") 
        .load("test.csv") 


      inputData.printSchema() 

      inputData.show() 

      val groupedData = inputData.withColumn("name",struct(inputData("firstname"),inputData("lastname"))) 

      val udfApply = groupedData.withColumn("newName",processName(groupedData("name"))) 

      udfApply.show() 
      } 



      def processName = udf((input:Row) =>{ 

       println(input) 
       println(input.schema) 

       Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) 

       }) 

     } 

आउटपुट:

root 
|-- id: string (nullable = true) 
|-- firstname: string (nullable = true) 
|-- lastname: string (nullable = true) 

+---+---------+--------+ 
| id|firstname|lastname| 
+---+---------+--------+ 
| 1|  jack| reacher| 
| 2|  john|  Doe| 
+---+---------+--------+ 

त्रुटि:

[jack,reacher] StructType(StructField(i[1],StringType,true), > StructField(i[2],StringType,true)) 17/03/08 09:45:35 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.IllegalArgumentException: Field "firstname" does not exist.

+0

आप दो स्ट्रिंग्स ('स्ट्रिंग्स 'के रूप में सीधे udf में क्यों नहीं पारित करते हैं? –

+0

यह संभव है लेकिन आप स्पार्क यूडीएफ में तर्क के रूप में 10 से अधिक फ़ील्ड पास नहीं कर सकते हैं। यहां मैंने जो एक प्रदान किया है वह एक सरल उपयोग केस है। कभी-कभी मुझे यूडीएफ में 20 से अधिक कॉलम पास करना पड़ता है। मैं इसे कैसे प्राप्त करूं? – hp2326

उत्तर

1

क्या आप सामना कर रहे हैं वास्तव में अजीब है। थोड़ा सा खेलने के बाद मुझे अंत में पता चला कि यह ऑप्टिमाइज़र इंजन के साथ किसी समस्या से संबंधित हो सकता है। ऐसा लगता है कि समस्या यूडीएफ नहीं है लेकिन struct फ़ंक्शन है।

मुझे लगता है मैं आपके द्वारा बताए गए अपवाद कैशिंग के बिना यह (स्पार्क 1.6.3) काम करने के लिए जब मैं groupedDatacache मिलता है,:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 


object Demo { 

    def main(args: Array[String]): Unit = { 

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[1]")) 
    val sqlContext = new HiveContext(sc) 
    import sqlContext.implicits._ 
    import org.apache.spark.sql.functions._ 


    def processName = udf((input: Row) => { 
     Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) 
    }) 


    val inputData = 
     sc.parallelize(
     Seq(("1", "Kevin", "Costner")) 
    ).toDF("id", "firstname", "lastname") 


    val groupedData = inputData.withColumn("name", struct(inputData("firstname"), inputData("lastname"))) 
     .cache() // does not work without cache 

    val udfApply = groupedData.withColumn("newName", processName(groupedData("name"))) 
    udfApply.show() 
    } 
} 

वैकल्पिक रूप से आप अपने struct बनाने के लिए RDD एपीआई का उपयोग कर सकते हैं, लेकिन यह वास्तव में अच्छा नहीं है:

case class Name(firstname:String,lastname:String) // define outside main 

val groupedData = inputData.rdd 
    .map{r => 
     (r.getAs[String]("id"), 
      Name(
      r.getAs[String]("firstname"), 
      r.getAs[String]("lastname") 
     ) 
     ) 
    } 
    .toDF("id","name") 
+0

धन्यवाद @ राफेल रोथ। यह अभी मेरे लिए काम किया है। मैं यह जवाब स्वीकार करूंगा। – hp2326

संबंधित मुद्दे