2016-03-30 8 views
7

मेरे पास एक छोटा डेटासेट है जो स्पार्क नौकरी का परिणाम होगा। मैं इस डेटासेट को नौकरी के अंत में सुविधा के लिए डेटाफ्रेम में परिवर्तित करने के बारे में सोच रहा हूं, लेकिन स्कीमा को सही ढंग से परिभाषित करने के लिए संघर्ष कर रहा हूं। समस्या नीचे अंतिम फ़ील्ड है (topValues); यह tuples का एक ऐरेबफर है - चाबियाँ और मायने रखता है।स्पार्क: प्रोग्रामिंग रूप से स्कैला में डेटाफ्रेम स्कीमा बनाना

val innerSchema = 
    StructType(
     Array(
     StructField("value", StringType), 
     StructField("count", LongType) 
    ) 
    ) 
    val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", innerSchema) 
    ) 
    ) 

    val result = stats.columnStats.map{ c => 
    Row(c._2.name, c._1, c._2.count, c._2.empties, c._2.nulls, c._2.uniqueValues, c._2.mean, c._2.min, c._2.max, c._2.topValues.topN) 
    } 

    val rdd = sc.parallelize(result.toSeq) 

    val outputDf = sqlContext.createDataFrame(rdd, outputSchema) 

    outputDf.show() 

त्रुटि मैं हो रही है एक MatchError है: scala.MatchError: ArrayBuffer((10,2), (20,3), (8,1)) (of class scala.collection.mutable.ArrayBuffer)

जब मैं डिबग और मेरी वस्तुओं का निरीक्षण किया, मैं यह देख रहा हूँ:

rdd: ParallelCollectionRDD[2] 
rdd.data: "ArrayBuffer" size = 2 
rdd.data(0): [age,2,6,0,0,3,14.666666666666666,8.0,20.0,ArrayBuffer((10,2), (20,3), (8,1))] 
rdd.data(1): [gender,3,6,0,0,2,0.0,0.0,0.0,ArrayBuffer((M,4), (F,2))] 

ऐसा नहीं है कि मैं मुझे लगता है मैंने अपने आंतरिक श्केमा में टुपल्स के ऐरेबफर को सटीक रूप से वर्णित किया है, लेकिन स्पार्क असहमत हैं।

कोई विचार है कि मुझे स्कीमा को कैसे परिभाषित किया जाना चाहिए?

+0

यदि आप एक उदाहरण डेटा या कम से कम सटीक प्रकार के 'आरडीडी' प्रदान करते हैं तो यह उपयोगी होगा। – zero323

उत्तर

10
val rdd = sc.parallelize(Array(Row(ArrayBuffer(1,2,3,4)))) 
val df = sqlContext.createDataFrame(
    rdd, 
    StructType(Seq(StructField("arr", ArrayType(IntegerType, false), false) 
) 

df.printSchema 
root 
|-- arr: array (nullable = false) 
| |-- element: integer (containsNull = false) 

df.show 
+------------+ 
|   arr| 
+------------+ 
|[1, 2, 3, 4]| 
+------------+ 
+0

हां, ऐरे टाइप सही दृष्टिकोण है। धन्यवाद! मेरा अंतिम स्कीमा मेरे जवाब में है। – Stuart

4

जैसा कि डेविड ने बताया, मुझे एक ऐरेटाइप का उपयोग करने की आवश्यकता थी। स्पार्क इस से खुश है:

val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", ArrayType(StructType(Array(
      StructField("value", StringType), 
      StructField("count", LongType) 
     )))) 
    ) 
    ) 
संबंधित मुद्दे