मेरे पास स्पार्क 1.2.0 के साथ एक स्पार्क स्ट्रीमिंग वातावरण है जहां मैं स्थानीय फ़ोल्डर से डेटा पुनर्प्राप्त करता हूं और हर बार जब मैं फ़ोल्डर में एक नई फ़ाइल जोड़ता हूं तो मैं कुछ परिवर्तन करता हूं।स्पार्क-स्ट्रीमिंग संदर्भ में एचडीएफएस में एक आरडीडी लिखें
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
आदेश DStream डेटा पर अपने विश्लेषण प्रदर्शन करने के लिए मैं इसे किसी सरणी
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
तो मैं जानकारी मैं चाहता हूँ को निकालने के लिए और HDFS पर उन्हें बचाने के लिए प्राप्त डेटा का उपयोग में रूपांतरित करने के लिए है।
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
के बाद से मैं वास्तव में किसी सरणी यह (जो ठीक काम करेगा) DStream.saveAsTextFiles("...")
साथ HDFS पर डेटा सहेजने देती असंभव है के साथ डेटा में हेरफेर करने की जरूरत है और मैं RDD को बचाने के लिए है, लेकिन इस preocedure साथ मैं अंत में नामित खाली उत्पादन फ़ाइलें भाग -00000 आदि ...
arr.foreach(println)
के साथ मैं ट्रांसफ़ोमेशन के सही परिणाम देख पा रहा हूं।
मेरा संदेह यह है कि स्पार्क एक ही फाइल में डेटा लिखने के लिए हर बैच पर कोशिश करता है, जो पहले लिखा गया था उसे हटा देता है। मैंने myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
जैसे गतिशील नामित फ़ोल्डर में सहेजने की कोशिश की लेकिन हमेशा केवल एक गुना बनाया गया है और आउटपुट फाइलें अभी भी खाली हैं।
मैं स्पार्क-स्ट्रीमिंग संदर्भ में एचडीएफएस में आरडीडी कैसे लिख सकता हूं?
मुझे लगता है कि समस्या यह है कि अपने आगमन सभी मजदूरों पर उपलब्ध नहीं है है: स्पार्क 1.5+ dataframes एपीआई इस सुविधा है। क्या आपने अपना एआर प्रसारण करने की कोशिश की और अंत में इसे एचडीएफएस में लिख दिया? –
क्योंकि मुझे किसी फ़ोल्डर की निगरानी करने और अपलोड की गई सभी नई फ़ाइल को अवरुद्ध करने की आवश्यकता है और एक अच्छा समाधान की तरह स्ट्रीमिंग स्ट्रीमिंग की आवश्यकता है। यह एक मशीन नहीं है लेकिन 2 मशीन-क्लस्टर है। अब मैं सिर्फ पाठ के रूप में फाइल लिख रहा हूं लेकिन भविष्य में मुझे लकड़ी की छत फाइलें लिखनी होंगी और यह स्पार्क – drstein
के साथ बहुत सरल है, क्या आप इसे आजमाएं? var arr = new ArrayBuffer [स्ट्रिंग](); वैल प्रसारित = sc.broadcast (आगमन) data.foreachRDD { प्रसारित ++ = _.collect() } वैल myRDD = sc.parallelize (प्रसारित) myRDD.saveAsTextFile ("HDFS निर्देशिका ....") –