मैं सेटअप S3 से पाठ फ़ाइलों को स्ट्रीम करने के लिए एक साधारण परीक्षण और यह काम करने के लिए मिल गया जब मैं की तरहस्पार्क स्ट्रीमिंग textFileStream वाइल्डकार्ड का समर्थन नहीं
val input = ssc.textFileStream("s3n://mybucket/2015/04/03/")
कुछ करने की कोशिश की और बाल्टी में मैं होता लॉग फाइल वहाँ में जाने के लिए और सबकुछ ठीक काम करेगा।
लेकिन अगर उनके एक सबफ़ोल्डर था, यह किसी भी फाइल कि सबफ़ोल्डर में डाल दिया गया नहीं मिलेगा (और हाँ, मुझे पता है कि HDFS वास्तव में एक फ़ोल्डर संरचना का उपयोग नहीं करता हूँ)
val input = ssc.textFileStream("s3n://mybucket/2015/04/")
तो, मैं जैसे मैं एक मानक चिंगारी आवेदन के साथ पहले किया है बस वाइल्डकार्ड करने की कोशिश की
val input = ssc.textFileStream("s3n://mybucket/2015/04/*")
लेकिन जब मैं यह कोशिश यह एक त्रुटि फेंकता
java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist.
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
.....
मुझे एक तथ्य के बारे में पता है कि आप मानक स्पार्क अनुप्रयोगों के लिए फ़ाइल इनपुट पढ़ने के दौरान वाइल्डकार्ड का उपयोग कर सकते हैं, लेकिन ऐसा लगता है कि स्ट्रीमिंग इनपुट करते समय, यह ऐसा नहीं करता है और न ही यह सबफॉल्डर में फ़ाइलों को स्वचालित रूप से संसाधित करता है। क्या मैं यहाँ कुछ याद कर रहा हूँ ??
अंत में मैं क्या जरूरत है कि एक S3 बाल्टी है उस तिथि तक यह में रखा लॉग निगरानी की जाएगी एक स्ट्रीमिंग का काम 24/7 चल रहा हो
तो
s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName>
की तरह कुछ वहाँ किसी भी है है इसे सबसे अधिक फ़ोल्डर को सौंपने का तरीका है और यह स्वचालित रूप से किसी भी फ़ोल्डर में दिखाई देने वाली फ़ाइलों को पढ़ता है (जाहिर है कि तिथि हर दिन बढ़ेगी)?
संपादित
तो http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources पर दस्तावेज़ की खुदाई पर यह कहा गया है कि नेस्टेड निर्देशिका समर्थित नहीं हैं।
क्या कोई इस मामले में कुछ प्रकाश डाल सकता है कि ऐसा क्यों है?
इसके अलावा, क्योंकि मेरी फ़ाइलों को उनकी तिथि के आधार पर घोंसला दिया जाएगा, मेरे स्ट्रीमिंग एप्लिकेशन में इस समस्या को हल करने का एक अच्छा तरीका क्या होगा? यह थोड़ा जटिल है क्योंकि लॉग को S3 पर लिखने में कुछ मिनट लगते हैं और इसलिए दिन के लिए लिखी गई अंतिम फ़ाइल पिछले दिन के फ़ोल्डर में लिखी जा सकती है, भले ही हम नए दिन में कुछ मिनट हों।
वास्तव में मुझे यकीन नहीं है कि एस 3 समर्थन वाइल्डकार्ड ... – eliasah
यह निश्चित रूप से करता है। मेरी नौकरियां पिछले 8 महीनों से वाइल्डकार्ड का उपयोग कर रही हैं। इसके अलावा, सिर्फ एक सैनिटी चेक के लिए मैंने अभी वाइल्डकार्ड इनपुट के साथ नौकरी की, ठीक काम किया। मैं नोटिस किया है कि यह आवश्यकता होती है कि आप की तरह s3n कुछ करना नहीं है के बारे में थोड़ा picky है: // mybucket/2015/04 * कहते हैं कि सूत्र में अपवाद "मुख्य" java.io.IOException : नहीं एक फ़ाइल: s3n: // mybucket/2015/04/01 कौन समझ में आता है के रूप में यह एक फ़ाइल नहीं है लेकिन अगर आप s3n: // mybucket/2015/04/* यह दिन उप फ़ोल्डर में सभी फ़ाइलों को सही ढंग से पारदर्शी करता है .... इस तरह का मुझे एक बग जैसा लगता है। –
मैं सवाल उठाने वाला हूं। मुझे याद है कि एक ही समस्या है लेकिन मुझे याद नहीं है कि मैंने इसे कैसे हल किया है। – eliasah