2016-04-27 8 views
5

मैं पूरे स्पार्क डेटाफ्रेम को स्कैला मानचित्र संग्रह में परिवर्तित करने का सबसे अच्छा समाधान खोजने का प्रयास कर रहा हूं।एक स्पार्क डेटाफ्रेम को स्कैला मानचित्र संग्रह में परिवर्तित करना

इस से जाने के लिए (स्पार्क उदाहरण में):: यह सबसे अच्छा के रूप में निम्नानुसार दर्शाया गया है

val people = Map(
Map("age" -> null, "name" -> "Michael"), 
Map("age" -> 30, "name" -> "Andy"), 
Map("age" -> 19, "name" -> "Justin") 
) 

उत्तर

6

मैं: एक स्काला संग्रह करने के लिए

val df = sqlContext.read.json("examples/src/main/resources/people.json") 

df.show 
+----+-------+ 
| age| name| 
+----+-------+ 
|null|Michael| 
| 30| Andy| 
| 19| Justin| 
+----+-------+ 

(मैप्स के मानचित्र) इस तरह का प्रतिनिधित्व किया ऐसा नहीं लगता कि आपका प्रश्न समझ में आता है - आपका बाहरीतम Map, मुझे लगता है कि आप इसमें मूल्यों को भरने की कोशिश कर रहे हैं - आपको अपने बाहरी Map में कुंजी/मूल्य जोड़े होने की आवश्यकता है। इसका अर्थ है कि:

val peopleArray = df.collect.map(r => Map(df.columns.zip(r.toSeq):_*)) 

आप देंगे:

Array(
    Map("age" -> null, "name" -> "Michael"), 
    Map("age" -> 30, "name" -> "Andy"), 
    Map("age" -> 19, "name" -> "Justin") 
) 

उस समय तुम कर सकते हो:

val people = Map(peopleArray.map(p => (p.getOrElse("name", null), p)):_*) 

जो तुम देना होगा:

Map(
    ("Michael" -> Map("age" -> null, "name" -> "Michael")), 
    ("Andy" -> Map("age" -> 30, "name" -> "Andy")), 
    ("Justin" -> Map("age" -> 19, "name" -> "Justin")) 
) 

मैं मैं अनुमान लगा रहा हूं कि आप वास्तव में और क्या चाहते हैं । आप एक मनमाना Long सूचकांक पर उन्हें प्रमुख करना चाहता था, तो आप कर सकते हैं:

val indexedPeople = Map(peopleArray.zipWithIndex.map(r => (r._2, r._1)):_*) 

जो तुम देता है:

Map(
    (0 -> Map("age" -> null, "name" -> "Michael")), 
    (1 -> Map("age" -> 30, "name" -> "Andy")), 
    (2 -> Map("age" -> 19, "name" -> "Justin")) 
) 
+0

वह काम किया। मैं वास्तव में misspoke। मुझे केवल मैप्स के संग्रह की आवश्यकता थी और पहली पंक्ति ने मुझे वही दिया जो मुझे चाहिए था। धन्यवाद –

+0

मीठे, तो मेरा जवाब स्वीकार करें? '' ;-) –

0

पहले Dataframe

val schemaList = dataframe.schema.map(_.name).zipWithIndex//get schema list from dataframe 

से RDD जाओ स्कीमा प्राप्त डेटाफ्रेम और मैपिंग से

dataframe.rdd.map(row => 
    //here rec._1 is column name and rce._2 index 
    schemaList.map(rec => (rec._1, row(rec._2))).toMap 
).collect.foreach(println) 
संबंधित मुद्दे