2015-09-17 6 views
6

मैं एक स्पार्क डेटाफ्रेम को सहेजना चाहता हूं जिसमें एक क्लार्क के रूप में एक कस्टम क्लास है। यह वर्ग एक और कस्टम वर्ग के एक वर्ग द्वारा रचित है। ऐसा करने के लिए, मैं इन वर्गों में से प्रत्येक के लिए VectorUDT के समान एक UserDefinedType क्लास बना देता हूं। मैं डेटाफ्रेम के साथ काम कर सकता हूं जैसा कि मैंने इरादा किया है लेकिन इसे डिस्क पर एक लकड़ी (या जेसन) के रूप में सहेज नहीं सकता है, मैंने इसे एक बग के रूप में बताया, लेकिन शायद मेरे कोड में कोई समस्या है।नेस्टेड उपयोगकर्ता डेटा प्रकारों के साथ स्पार्क डेटाफ्रेम सहेजना

import org.apache.spark.sql.SaveMode 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.catalyst.InternalRow 
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow 
import org.apache.spark.sql.types._ 

@SQLUserDefinedType(udt = classOf[AUDT]) 
case class A(list:Seq[B]) 

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) 
    override def userClass: Class[A] = classOf[A] 
    override def serialize(obj: Any): Any = obj match { 
    case A(list) => 
     val row = new GenericMutableRow(1) 
     row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) 
    } 
    } 
} 

object AUDT extends AUDT 

@SQLUserDefinedType(udt = classOf[BUDT]) 
case class B(num:Int) 

class BUDT extends UserDefinedType[B] { 
    override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) 
    override def userClass: Class[B] = classOf[B] 
    override def serialize(obj: Any): Any = obj match { 
    case B(num) => 
     val row = new GenericMutableRow(1) 
     row.setInt(0, num) 
     row 
    } 

    override def deserialize(datum: Any): B = { 
    datum match { 
     case row: InternalRow => new B(row.getInt(0)) 
    } 
    } 
} 

object BUDT extends BUDT 

object TestNested { 
    def main(args:Array[String]) = { 
    val col = Seq(new A(Seq(new B(1), new B(2))), 
        new A(Seq(new B(3), new B(4)))) 

    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(1 to 2 zip col).toDF() 
    df.show() 

    df.write.mode(SaveMode.Overwrite).save(...) 
    } 
} 

यह निम्न त्रुटि में परिणाम है: मैं इस समस्या को दिखाने के लिए एक सरल उदाहरण को क्रियान्वित किया है

15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):

एक पर कोई समस्या नहीं के बजाय बी के साथ एक dataframe सहेजते हैं नहीं के रूप में बी के बाद से मौजूद है नेस्टेड कस्टम क्लास। क्या मैं कुछ भूल रहा हूँ?

उत्तर

2

मुझे इसे बनाने के लिए अपने कोड में चार बदलाव करना पड़ा (लिनक्स पर स्पार्क 1.6.0 में परीक्षण किया गया) और मुझे लगता है कि मैं अधिकतर बता सकता हूं कि उनकी आवश्यकता क्यों है। हालांकि, मुझे लगता है कि क्या एक आसान समाधान है, मुझे आश्चर्य है।

  1. जब sqlType को परिभाषित है, यह BUDT.sqlType पर निर्भर करते हैं, न कि सिर्फ BUDT से बनाने: इस प्रकार सभी परिवर्तन, AUDT में हैं।
  2. serialize() में, प्रत्येक सूची तत्व पर BUDT.serialize() पर कॉल करें।
  3. deserialize() में:

    class AUDT extends UserDefinedType[A] { 
        override def sqlType: DataType = 
        StructType(
         Seq(StructField("list", 
             ArrayType(BUDT.sqlType, containsNull = false), 
             nullable = true))) 
    
        override def userClass: Class[A] = classOf[A] 
    
        override def serialize(obj: Any): Any = 
        obj match { 
         case A(list) => 
         val row = new GenericMutableRow(1) 
         val elements = 
          list.map(_.asInstanceOf[Any]) 
           .map(e => BUDT.serialize(e)) 
           .toArray 
         row.update(0, new GenericArrayData(elements)) 
         row 
        } 
    
        override def deserialize(datum: Any): A = { 
        datum match { 
         case row: InternalRow => 
         val first = row.getArray(0) 
         val bs:Array[InternalRow] = first.toArray(BUDT.sqlType) 
         val bseq = bs.toSeq.map(e => BUDT.deserialize(e)) 
         val a = new A(bseq) 
         a 
        } 
        } 
    
    } 
    

    सभी चार ch:

    • हर तत्व

यहाँ पर कॉल toArray(BUDT.sqlType) बजाय toArray(BUDT)

  • कॉल BUDT.deserialize() जिसके परिणामस्वरूप कोड है कोणों का एक ही चरित्र होता है: A एस के हैंडलिंग और B एस के हैंडलिंग के बीच संबंध अब बहुत स्पष्ट है: स्कीमा टाइपिंग के लिए, क्रमबद्ध करने के लिए और डी-सीरियलाइजेशन के लिए। मूल कोड इस धारणा पर आधारित प्रतीत होता है कि स्पार्क एसक्यूएल "इसे समझ लें", जो उचित हो सकता है, लेकिन स्पष्ट रूप से यह नहीं है।

  • +0

    यह काम किया। मैं डेटाफ्रेम में कॉलम के रूप में "जटिल" ऑब्जेक्ट्स का उपयोग कर रहा हूं और स्पार्क 1.6.0 में यह काम करना बंद कर दिया है। यह चाल है, इसलिए मैंने जो सबक सीखा वह सिर्फ सीरियलाइजेशन/डी-सीरियलाइजेशन के बारे में सबकुछ स्पष्ट है। चीयर्स! –

    संबंधित मुद्दे