8

स्पार्क 2.1 और स्कैला 2.11 से कॉलम जोड़ना। मेरे पास एक बड़ा Map[String,Date] है जिसमें इसमें 10 के कुंजी/मूल्य जोड़े हैं। मैं भी 10K JSON स्पार्क के लिए एक फाइल सिस्टम है कि पहुँचा जा सकता है पर रहने वाले फ़ाइलें:स्पार्क डेटासेट में JSON फ़ाइलों को पढ़ना और एक अलग मानचित्र

mnt/ 
    some/ 
     path/ 
      data00001.json 
      data00002.json 
      data00003.json 
      ... 
      data10000.json 

नक्शे में प्रत्येक केवी जोड़ी अपने संबंधित JSON फ़ाइल से मेल खाती है (इसलिए 1 मानचित्र केवी जोड़ी, data00001.json से मेल खाती है आदि)

मैं इन सभी JSON फ़ाइलों को 1 बड़े स्पार्क Dataset में पढ़ना चाहता हूं और, जबकि मैं इसमें हूं, इस डेटासेट में दो नए कॉलम जोड़ें (जो JSON फ़ाइलों में मौजूद नहीं है)। जब मैं withColumn विधियों को हटाता हूं और allData.show() करता हूं तो JSON एक एकल डेटासेट में फ़ाइलें करता है। तो मैं वहां सब अच्छा हूँ।

मैं किसके साथ संघर्ष कर रहा हूं: मैं दो नए कॉलम कैसे जोड़ूं और फिर सभी कुंजी/मूल्य मानचित्र तत्वों को सही तरीके से निकालूं?

उत्तर

4

यदि मैं सही ढंग से समझ गया तो आप जेसन फाइलों से डेटाफ्रेम के साथ मानचित्र से केवी को सहसंबंधित करना चाहते हैं।

मैं समस्या को केवल 3 फाइलों और 3 मुख्य मानों को क्रमबद्ध करने की कोशिश करूंगा।

val kvs = Map("a" -> 1, "b" -> 2, "c" -> 3) 
val files = List("data0001.json", "data0002.json", "data0003.json") 

अधिक आसान फ़ाइलों से निपटने, कुंजी के लिए एक मामला वर्ग को परिभाषित करें, महत्व देता

case class FileWithKV(fileName: String, key: String, value: Int) 

फ़ाइलें और केवीएस

val filesWithKVs = files.zip(kvs) 
    .map(p => FileWithKV(p._1, p._2._1, p._2._2)) 

यह इस

filesWithKVs: List[FileWithKV] = List(FileWithKV(data0001.json,a,1), FileWithKV(data0002.json,b,2), FileWithKV(data0003.json,c,3)) 
तरह दिखेगा ज़िप करेंगे

हम तब से शुरू करते हैं प्रारंभिक dataframe, हमारे संग्रह के सिर से और फिर तह पूरे dataframe कि सभी स्तंभों को गतिशील केवी

val head = filesWithKVs.head 
val initialDf = spark 
.read.json(head.filename) 
.withColumn(s"new_col_1", lit(head.key)) 
.withColumn(s"new_col_2", lit(head.value)) 

अब तह भाग से उत्पन्न साथ सभी फाइलों का आयोजन करेगा, निर्माण के लिए छोड़ दिया शुरू कर देंगे

val dfAll = filesWithKVs.tail.foldLeft(initialDf)((df, fileWithKV) => { 
    val newDf = spark 
    .read.json(fileWithKV.filename) 
    .withColumn(s"new_col_1", lit(fileWithKV.key)) 
    .withColumn(s"new_col_2", lit(fileWithKV.value)) 
    // union the dataframes to capture file by file, key value with key value 
    df.union(newDf) 
}) 

dataframe इस तरह दिखेगा, यह सोचते हैं कि json फाइलों में 3 json फ़ाइलों में से प्रत्येक के लिए एक कॉलम नामित बार और एक मूल्य foo हो जाएगा,

+---+----------+----------+ 
|bar|new_col_1 |new_col_2 | 
+---+----------+----------+ 
|foo|   a|   1| 
|foo|   b|   2| 
|foo|   c|   3| 
+---+----------+----------+ 
+3

मैं '' 'Dataframe.union''' का उपयोग करने के लिए अपनी सामान्य टिप्पणी जोड़ूंगा: इसे कुछ हज़ार बार से अधिक न करें। यदि आप फ़ाइलों की संख्या बड़ी है, तो निष्पादन ग्राफ अव्यवहारिक रूप से बड़ा हो जाएगा। वर्तमान हैकी विकल्प अंतर्निहित आरडीडी पर '' 'स्पार्ककॉन्टेक्स्ट.ऑनियन (आरडीडी *)' '' का उपयोग करना है। यह डीएजी flattens। –

3

मुझे लगता है कि आपको इसके लिए अपना डेटासेट बनाना चाहिए। यह नया डेटासोर्स आपके विशेष फ़ोल्डर संरचना और सामग्री संरचना के बारे में जानेंगे।

संबंधित मुद्दे