2017-03-21 7 views
8

के लिए स्पार्क यूडीएफ मेरे पास स्पार्क डेटाफ्रेम में "स्ट्रक्चर टाइप" कॉलम है जिसमें उप-फ़ील्ड के रूप में एक सरणी और स्ट्रिंग है। मैं सरणी को संशोधित करना चाहता हूं और उसी प्रकार के नए कॉलम को वापस करना चाहता हूं। क्या मैं इसे यूडीएफ के साथ संसाधित कर सकता हूं? या विकल्प क्या हैं?स्ट्रक्चर टाइप/पंक्ति

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil) 
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil) 
val data = Seq(Row(Row(Array(1,2),"eb")), Row(Row(Array(3,2,1), "dsf"))) 
val rd = sc.parallelize(data) 
val df = spark.createDataFrame(rd, schema) 
df.printSchema 

root 
|-- subtable: struct (nullable = true) 
| |-- col1: array (nullable = true) 
| | |-- element: integer (containsNull = false) 
| |-- col2: string (nullable = true) 

ऐसा लगता है कि मैं प्रकार पंक्ति के एक यूडीएफ, जैसे

val u = udf((x:Row) => x) 
     >> Schema for type org.apache.spark.sql.Row is not supported 

यह समझ में आता है कुछ चाहिए, के बाद से स्पार्क वापसी प्रकार के लिए स्कीमा नहीं जानता है। दुर्भाग्य से, udf.register भी विफल रहता है:

spark.udf.register("foo", (x:Row)=> Row, sub_schema) 
    <console>:30: error: overloaded method value register with alternatives: ... 

उत्तर

3

हाँ आप यूडीएफ के साथ ऐसा कर सकते हैं। सादगी के लिए, मैं मामले वर्गों के साथ अपने उदाहरण ले लिया और मैं हर मूल्य के लिए 2 जोड़कर सरणी बदल दिया है:

case class Root(subtable: Subtable) 
case class Subtable(col1: Seq[Int], col2: String) 

val df = spark.createDataFrame(Seq(
    Root(Subtable(Seq(1, 2, 3), "toto")), 
    Root(Subtable(Seq(10, 20, 30), "tata")) 
)) 

val myUdf = udf((subtable: Row) => 
    Subtable(subtable.getSeq[Int](0).map(_ + 2), subtable.getString(1)) 
) 
val result = df.withColumn("subtable_new", myUdf(df("subtable"))) 
result.printSchema() 
result.show(false) 

प्रिंट होगा:

root 
|-- subtable: struct (nullable = true) 
| |-- col1: array (nullable = true) 
| | |-- element: integer (containsNull = false) 
| |-- col2: string (nullable = true) 
|-- subtable_new: struct (nullable = true) 
| |-- col1: array (nullable = true) 
| | |-- element: integer (containsNull = false) 
| |-- col2: string (nullable = true) 

+-------------------------------+-------------------------------+ 
|subtable      |subtable_new     | 
+-------------------------------+-------------------------------+ 
|[WrappedArray(1, 2, 3),toto] |[WrappedArray(3, 4, 5),toto] | 
|[WrappedArray(10, 20, 30),tata]|[WrappedArray(12, 22, 32),tata]| 
+-------------------------------+-------------------------------+ 
2

आप सही रास्ते पर हैं। इस परिदृश्य में यूडीएफ आपके जीवन को आसान बना देगा। जैसा कि आप पहले ही सामना कर चुके हैं, यूडीएफ उन प्रकारों को वापस नहीं कर सकता है जो स्पार्क को नहीं पता है। तो मूल रूप से आपको कुछ ऐसी चीज की आवश्यकता होगी जो स्पार्क आसानी से धारावाहिक हो सके। यह case class हो सकता है या आप (Seq[Int], String) जैसे टुपल लौट सकते हैं। तो यहाँ अपने कोड का एक संशोधित संस्करण है:

def main(args: Array[String]): Unit = { 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.types._ 
    val sub_schema = StructType(StructField("col1", ArrayType(IntegerType, false), true) :: StructField("col2", StringType, true) :: Nil) 
    val schema = StructType(StructField("subtable", sub_schema, true) :: Nil) 
    val data = Seq(Row(Row(Array(1, 2), "eb")), Row(Row(Array(3, 2, 1), "dsf"))) 
    val rd = spark.sparkContext.parallelize(data) 
    val df = spark.createDataFrame(rd, schema) 

    df.printSchema() 
    df.show(false) 

    val mapArray = (subRows: Row) => { 
    // I prefer reading values from row by specifying column names, you may use index also 
    val col1 = subRows.getAs[Seq[Int]]("col1") 
    val mappedCol1 = col1.map(x => x * x) // Use map based on your requirements 
    (mappedCol1, subRows.getAs[String]("col2")) // now mapping is done for col2 
    } 
    val mapUdf = udf(mapArray) 

    val newDf = df.withColumn("col1_mapped", mapUdf(df("subtable"))) 
    newDf.show(false) 
    newDf.printSchema() 
} 

कृपया इन कड़ियों को देखो, इन आप अधिक जानकारी दे सकता है।

  1. जटिल स्कीमा के साथ काम करने पर सबसे व्यापक जवाब: https://stackoverflow.com/a/33850490/4046067
  2. स्पार्क समर्थित डेटा प्रकार: https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
6

पता चला है कि आप एक दूसरे यूडीएफ पैरामीटर के रूप में परिणाम स्कीमा पारित कर सकते हैं:

val u = udf((x:Row) => x, sub_schema) 
संबंधित मुद्दे