2015-07-02 11 views
7

मेरे पास स्पार्क 1.2.0 के साथ एक स्पार्क स्ट्रीमिंग वातावरण है जहां मैं स्थानीय फ़ोल्डर से डेटा पुनर्प्राप्त करता हूं और हर बार जब मैं फ़ोल्डर में एक नई फ़ाइल जोड़ता हूं तो मैं कुछ परिवर्तन करता हूं।स्पार्क-स्ट्रीमिंग संदर्भ में एचडीएफएस में एक आरडीडी लिखें

val ssc = new StreamingContext(sc, Seconds(10)) 
val data = ssc.textFileStream(directory) 

आदेश DStream डेटा पर अपने विश्लेषण प्रदर्शन करने के लिए मैं इसे किसी सरणी

var arr = new ArrayBuffer[String](); 
    data.foreachRDD { 
    arr ++= _.collect() 
} 

तो मैं जानकारी मैं चाहता हूँ को निकालने के लिए और HDFS पर उन्हें बचाने के लिए प्राप्त डेटा का उपयोग में रूपांतरित करने के लिए है।

val myRDD = sc.parallelize(arr) 
myRDD.saveAsTextFile("hdfs directory....") 

के बाद से मैं वास्तव में किसी सरणी यह ​​(जो ठीक काम करेगा) DStream.saveAsTextFiles("...") साथ HDFS पर डेटा सहेजने देती असंभव है के साथ डेटा में हेरफेर करने की जरूरत है और मैं RDD को बचाने के लिए है, लेकिन इस preocedure साथ मैं अंत में नामित खाली उत्पादन फ़ाइलें भाग -00000 आदि ...

arr.foreach(println) के साथ मैं ट्रांसफ़ोमेशन के सही परिणाम देख पा रहा हूं।

मेरा संदेह यह है कि स्पार्क एक ही फाइल में डेटा लिखने के लिए हर बैच पर कोशिश करता है, जो पहले लिखा गया था उसे हटा देता है। मैंने myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) जैसे गतिशील नामित फ़ोल्डर में सहेजने की कोशिश की लेकिन हमेशा केवल एक गुना बनाया गया है और आउटपुट फाइलें अभी भी खाली हैं।

मैं स्पार्क-स्ट्रीमिंग संदर्भ में एचडीएफएस में आरडीडी कैसे लिख सकता हूं?

+0

मुझे लगता है कि समस्या यह है कि अपने आगमन सभी मजदूरों पर उपलब्ध नहीं है है: स्पार्क 1.5+ dataframes एपीआई इस सुविधा है। क्या आपने अपना एआर प्रसारण करने की कोशिश की और अंत में इसे एचडीएफएस में लिख दिया? –

+0

क्योंकि मुझे किसी फ़ोल्डर की निगरानी करने और अपलोड की गई सभी नई फ़ाइल को अवरुद्ध करने की आवश्यकता है और एक अच्छा समाधान की तरह स्ट्रीमिंग स्ट्रीमिंग की आवश्यकता है। यह एक मशीन नहीं है लेकिन 2 मशीन-क्लस्टर है। अब मैं सिर्फ पाठ के रूप में फाइल लिख रहा हूं लेकिन भविष्य में मुझे लकड़ी की छत फाइलें लिखनी होंगी और यह स्पार्क – drstein

+0

के साथ बहुत सरल है, क्या आप इसे आजमाएं? var arr = new ArrayBuffer [स्ट्रिंग](); वैल प्रसारित = sc.broadcast (आगमन) data.foreachRDD { प्रसारित ++ = _.collect() } वैल myRDD = sc.parallelize (प्रसारित) myRDD.saveAsTextFile ("HDFS निर्देशिका ....") –

उत्तर

5

आप स्पार्क स्ट्रीमिंग का उपयोग ऐसे तरीके से कर रहे हैं जिस पर इसे डिज़ाइन नहीं किया गया था। मैं या तो आपके उपयोग के मामले के लिए स्पार्क का उपयोग करके ड्रॉप की सिफारिश करता हूं, या अपना कोड अनुकूलित करता हूं ताकि यह स्पार्क तरीके से काम कर सके। ड्राइवर को सरणी एकत्र करना एक वितरित इंजन का उपयोग करने के उद्देश्य को हरा देता है और आपके ऐप को प्रभावी ढंग से सिंगल-मशीन बनाता है (दो मशीनें एक मशीन पर डेटा को प्रोसेस करने की तुलना में अधिक ओवरहेड का कारण बनती हैं)।

सब कुछ आप एक सरणी के साथ कर सकते हैं, आप स्पार्क के साथ कर सकते हैं। तो बस स्ट्रीम के अंदर अपनी गणनाएं चलाएं, श्रमिकों पर वितरित करें, और DStream.saveAsTextFiles() का उपयोग करके अपना आउटपुट लिखें। आप एक फ़ाइल को लिखने के लिए foreachRDD + saveAsParquet(path, overwrite = true) का उपयोग कर सकते हैं।

+0

का उपयोग करना है, धन्यवाद, मैं पूरी तरह से अपना मुद्दा प्राप्त करता हूं, मैं डीस्ट्रीम का उपयोग करने के लिए trasform तर्क को बदलने की कोशिश करूंगा। क्या आपको पता है कि एक ही फाइल में रिकॉर्ड सहेजने के लिए प्रत्येक बैच में स्पार्क-स्ट्रीमिंग के लिए संभव है? अभी मुझे हर बैच अंतराल की नई फाइलों के साथ एक नया फ़ोल्डर मिलता है। – drstein

+1

हां, foreachRDD + saveAsParquet के साथ ओवरराइट करने का विकल्प है। –

+0

@MariusSoutier क्या आप कृपया मुझे इस http: // stackoverflow.com/प्रश्न/39363586/समस्या-समय-भंडारण-डेटा-स्पार्क-स्ट्रीमिंग-टू-कैसानाड्रा से सहायता कर सकते हैं। – Naresh

2

@vzamboni:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path); 
संबंधित मुद्दे