2016-09-23 9 views
7

युक्त कक्षाओं के मामले में स्पार्क डेटासेट या डेटाफ्रेम कैसे बनाएं I Enums वाले केस क्लास का उपयोग करके स्पार्क डेटासेट बनाने की कोशिश कर रहा हूं लेकिन मैं सक्षम नहीं हूं। मैं स्पार्क संस्करण 1.6.0 का उपयोग कर रहा हूँ। अपवाद इस बारे में शिकायत कर रहे हैं कि मेरे एनम के लिए कोई एन्कोडर नहीं मिला है। स्पार्क में डेटा में enums रखने के लिए यह संभव नहीं है?एनम्स

कोड:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

object MyEnum extends Enumeration { 
    type MyEnum = Value 
    val Hello, World = Value 
} 

case class MyData(field: String, other: MyEnum.Value) 

object EnumTest { 

    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 
    val sqlCtx = new SQLContext(sc) 

    import sqlCtx.implicits._ 

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS() 

    println(s"df: ${df.collect().mkString(",")}}") 
    } 

} 

त्रुटि:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value 
- field (class: "scala.Enumeration.Value", name: "other") 
- root class: "com.company.MyData" 
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597) 
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509) 
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502) 
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394) 
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54) 
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41) 
at com.company.EnumTest$.main(EnumTest.scala:22) 
at com.company.EnumTest.main(EnumTest.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 

उत्तर

4

आप अपनी खुद की एनकोडर बना सकते हैं:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

object MyEnum extends Enumeration { 
    type MyEnum = Value 
    val Hello, World = Value 
} 

case class MyData(field: String, other: MyEnum.Value) 

object MyDataEncoders { 
    implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] = 
    org.apache.spark.sql.Encoders.kryo[MyData] 
} 

object EnumTest { 
    import MyDataEncoders._ 

    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 
    val sqlCtx = new SQLContext(sc) 

    import sqlCtx.implicits._ 

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS() 

    println(s"df: ${df.collect().mkString(",")}}") 
    } 
} 
+0

धन्यवाद! अगर मैं डीडीएस() के बजाय डीडी() करना चाहता हूं तो क्या होगा? फिर मुझे निम्न त्रुटि मिलती है: थ्रेड में अपवाद "मुख्य" java.lang.UnsupportedOperationException: टाइप के लिए स्कीमा com.nordea.gpdw.dq.MyEnum.Value समर्थित नहीं है – ErikHabanero

+0

क्या आप मेरे उत्तर में उपयोग किए गए बिल्कुल वही कोड का उपयोग कर रहे हैं? मैंने 'toDS' को' toDF' में बदलने का प्रयास किया है और ऐसा लगता है कि यह काम करता है। –

+0

हां, क्या आप वाकई डीडी: माईडाटा (हैलो, वर्ल्ड)} स्टडआउट पर प्रिंट करते हैं? चूंकि बहुत सारे लॉग आउटपुट हैं। – ErikHabanero

संबंधित मुद्दे