2016-07-28 16 views
5

मैंने स्पार्क में एक प्रलोभन में एक समय क्षेत्र से कुछ मूल्यों को परिवर्तित या निकालने के लिए एक सरल यूडीएफ बनाया। मैं फ़ंक्शन पंजीकृत करता हूं लेकिन जब मैं एसक्यूएल का उपयोग करके फ़ंक्शन को कॉल करता हूं तो यह एक NullPointerException फेंकता है। नीचे मेरा कार्य और इसे निष्पादित करने की प्रक्रिया है। मैं ज़ेपेल्लिन का उपयोग कर रहा हूं। आश्चर्यजनक रूप से यह कल काम कर रहा था लेकिन यह आज सुबह काम करना बंद कर दिया।स्कैला और स्पार्क यूडीएफ फ़ंक्शन

समारोह

def convert(time:String) : String = { 
    val sdf = new java.text.SimpleDateFormat("HH:mm") 
    val time1 = sdf.parse(time) 
    return sdf.format(time1) 
} 

एसक्यूएल के बिना समारोह

sqlContext.udf.register("convert",convert _) 

टेस्ट समारोह रजिस्टर - यह काम करता है

convert(12:12:12) -> returns 12:12 

टेस्ट टसेपेल्लिन में एसक्यूएल के साथ समारोह इस विफल रहता है। temptable

root 
|-- date: string (nullable = true) 
|-- time: string (nullable = true) 
|-- serverip: string (nullable = true) 
|-- request: string (nullable = true) 
|-- resource: string (nullable = true) 
|-- protocol: integer (nullable = true) 
|-- sourceip: string (nullable = true) 

स्टैकट्रेस है कि मैं हो रही है के भाग के

%sql 
select convert(time) from temptable limit 10 

संरचना।

java.lang.NullPointerException 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643) 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652) 
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54) 
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44) 

उत्तर

7

बजाय एक समारोह को परिभाषित की यूडीएफ उपयोग सीधे

import org.apache.spark.sql.functions._ 

val convert = udf[String, String](time => { 
     val sdf = new java.text.SimpleDateFormat("HH:mm") 
     val time1 = sdf.parse(time) 
     sdf.format(time1) 
    } 
) 

एक यूडीएफ के इनपुट पैरामीटर कॉलम (या कॉलम) है। और वापसी का प्रकार कॉलम है।

case class UserDefinedFunction protected[sql] (
    f: AnyRef, 
    dataType: DataType, 
    inputTypes: Option[Seq[DataType]]) { 

    def apply(exprs: Column*): Column = { 
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil))) 
    } 
} 
संबंधित मुद्दे