2015-04-03 17 views
11

मैं सेटअप S3 से पाठ फ़ाइलों को स्ट्रीम करने के लिए एक साधारण परीक्षण और यह काम करने के लिए मिल गया जब मैं की तरहस्पार्क स्ट्रीमिंग textFileStream वाइल्डकार्ड का समर्थन नहीं

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/") 

कुछ करने की कोशिश की और बाल्टी में मैं होता लॉग फाइल वहाँ में जाने के लिए और सबकुछ ठीक काम करेगा।

लेकिन अगर उनके एक सबफ़ोल्डर था, यह किसी भी फाइल कि सबफ़ोल्डर में डाल दिया गया नहीं मिलेगा (और हाँ, मुझे पता है कि HDFS वास्तव में एक फ़ोल्डर संरचना का उपयोग नहीं करता हूँ)

val input = ssc.textFileStream("s3n://mybucket/2015/04/") 

तो, मैं जैसे मैं एक मानक चिंगारी आवेदन के साथ पहले किया है बस वाइल्डकार्ड करने की कोशिश की

val input = ssc.textFileStream("s3n://mybucket/2015/04/*") 

लेकिन जब मैं यह कोशिश यह एक त्रुटि फेंकता

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist. 
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523) 
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176) 
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) 
at scala.Option.orElse(Option.scala:257) 
..... 

मुझे एक तथ्य के बारे में पता है कि आप मानक स्पार्क अनुप्रयोगों के लिए फ़ाइल इनपुट पढ़ने के दौरान वाइल्डकार्ड का उपयोग कर सकते हैं, लेकिन ऐसा लगता है कि स्ट्रीमिंग इनपुट करते समय, यह ऐसा नहीं करता है और न ही यह सबफॉल्डर में फ़ाइलों को स्वचालित रूप से संसाधित करता है। क्या मैं यहाँ कुछ याद कर रहा हूँ ??

अंत में मैं क्या जरूरत है कि एक S3 बाल्टी है उस तिथि तक यह में रखा लॉग निगरानी की जाएगी एक स्ट्रीमिंग का काम 24/7 चल रहा हो

तो

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName> 

की तरह कुछ वहाँ किसी भी है है इसे सबसे अधिक फ़ोल्डर को सौंपने का तरीका है और यह स्वचालित रूप से किसी भी फ़ोल्डर में दिखाई देने वाली फ़ाइलों को पढ़ता है (जाहिर है कि तिथि हर दिन बढ़ेगी)?

संपादित

तो http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources पर दस्तावेज़ की खुदाई पर यह कहा गया है कि नेस्टेड निर्देशिका समर्थित नहीं हैं।

क्या कोई इस मामले में कुछ प्रकाश डाल सकता है कि ऐसा क्यों है?

इसके अलावा, क्योंकि मेरी फ़ाइलों को उनकी तिथि के आधार पर घोंसला दिया जाएगा, मेरे स्ट्रीमिंग एप्लिकेशन में इस समस्या को हल करने का एक अच्छा तरीका क्या होगा? यह थोड़ा जटिल है क्योंकि लॉग को S3 पर लिखने में कुछ मिनट लगते हैं और इसलिए दिन के लिए लिखी गई अंतिम फ़ाइल पिछले दिन के फ़ोल्डर में लिखी जा सकती है, भले ही हम नए दिन में कुछ मिनट हों।

+0

वास्तव में मुझे यकीन नहीं है कि एस 3 समर्थन वाइल्डकार्ड ... – eliasah

+1

यह निश्चित रूप से करता है। मेरी नौकरियां पिछले 8 महीनों से वाइल्डकार्ड का उपयोग कर रही हैं। इसके अलावा, सिर्फ एक सैनिटी चेक के लिए मैंने अभी वाइल्डकार्ड इनपुट के साथ नौकरी की, ठीक काम किया। मैं नोटिस किया है कि यह आवश्यकता होती है कि आप की तरह s3n कुछ करना नहीं है के बारे में थोड़ा picky है: // mybucket/2015/04 * कहते हैं कि सूत्र में अपवाद "मुख्य" java.io.IOException : नहीं एक फ़ाइल: s3n: // mybucket/2015/04/01 कौन समझ में आता है के रूप में यह एक फ़ाइल नहीं है लेकिन अगर आप s3n: // mybucket/2015/04/* यह दिन उप फ़ोल्डर में सभी फ़ाइलों को सही ढंग से पारदर्शी करता है .... इस तरह का मुझे एक बग जैसा लगता है। –

+1

मैं सवाल उठाने वाला हूं। मुझे याद है कि एक ही समस्या है लेकिन मुझे याद नहीं है कि मैंने इसे कैसे हल किया है। – eliasah

उत्तर

0

हमें एक ही समस्या थी। हम अल्पविराम के साथ उप फ़ोल्डर नाम में शामिल हो गए।

List<String> paths = new ArrayList<>(); 
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd"); 

try {   
    Date start = sdf.parse("2015/02/01"); 
    Date end = sdf.parse("2015/04/01"); 

    Calendar calendar = Calendar.getInstance(); 
    calendar.setTime(start);   

    while (calendar.getTime().before(end)) { 
     paths.add("s3n://mybucket/" + sdf.format(calendar.getTime())); 
     calendar.add(Calendar.DATE, 1); 
    }     
} catch (ParseException e) { 
    e.printStackTrace(); 
} 

String joinedPaths = StringUtils.join(",", paths.toArray(new String[paths.size()])); 
val input = ssc.textFileStream(joinedPaths); 

मुझे आशा है कि इस तरह आपकी समस्या हल हो जाएगी।

+0

कूल। आप बड़ी समाप्ति तिथियों से कैसे निपटते हैं? प्रोग्राम संकलित और पुन: लॉन्च करके? या क्या मैं कुछ न कुछ भूल रहा हूं? –

6

कुछ "बदसूरत लेकिन कामकाजी समाधान" FileInputDStream को विस्तारित करके बनाया जा सकता है। लेखन sc.textFileStream(d)

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

के बराबर आप CustomFileInputDStream कि FileInputDStream का विस्तार होगा बना सकते हैं। कस्टम क्लास FileInputDStream क्लास से गणना विधि की प्रतिलिपि बनायेगा और आपकी ज़रूरतों के लिए findNewFiles विधि को समायोजित करेगा।

बदलते findNewFiles विधि से:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

    // Calculate ignore threshold 
    val modTimeIgnoreThreshold = math.max(
    initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
    currentTime - durationToRemember.milliseconds // trailing end of the remember window 
) 
    logDebug(s"Getting new files for time $currentTime, " + 
    s"ignoring files older than $modTimeIgnoreThreshold") 
    val filter = new PathFilter { 
    def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
    } 
    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) 
    val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
    logInfo("Finding new files took " + timeTaken + " ms") 
    logDebug("# cached file times = " + fileToModTime.size) 
    if (timeTaken > slideDuration.milliseconds) { 
    logWarning(
     "Time taken to find new files exceeds the batch size. " + 
     "Consider increasing the batch size or reducing the number of " + 
     "files in the monitored directory." 
    ) 
    } 
    newFiles 
} catch { 
    case e: Exception => 
    logWarning("Error finding new files", e) 
    reset() 
    Array.empty 
} 

}

रहे हैं:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

     // Calculate ignore threshold 
     val modTimeIgnoreThreshold = math.max(
     initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
     currentTime - durationToRemember.milliseconds // trailing end of the remember window 
    ) 
     logDebug(s"Getting new files for time $currentTime, " + 
     s"ignoring files older than $modTimeIgnoreThreshold") 
     val filter = new PathFilter { 
     def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
     } 
     val directories = fs.listStatus(directoryPath).filter(_.isDirectory) 
     val newFiles = ArrayBuffer[FileStatus]() 

     directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*)) 

     val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
     logInfo("Finding new files took " + timeTaken + " ms") 
     logDebug("# cached file times = " + fileToModTime.size) 
     if (timeTaken > slideDuration.milliseconds) { 
     logWarning(
      "Time taken to find new files exceeds the batch size. " + 
      "Consider increasing the batch size or reducing the number of " + 
      "files in the monitored directory." 
     ) 
     } 
     newFiles.map(_.getPath.toString).toArray 
    } catch { 
     case e: Exception => 
     logWarning("Error finding new files", e) 
     reset() 
     Array.empty 
    } 
    } 

सब पहली डिग्री सब फ़ोल्डर में फ़ाइलों के लिए जाँच करेगा, आप बैच टाइमस्टैम्प उपयोग करने के लिए इसे समायोजित कर सकते प्रासंगिक "उपनिर्देशिका" तक पहुंचने के लिए।

मैं CustomFileInputDStream बनाया के रूप में मैं उल्लेख किया है और फोन करके यह सक्रिय:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

यह हमें उम्मीद व्यवहार करने लगता है।

मैं विचार के लिए कुछ बिंदुओं को जोड़ना होगा जब मैं इस तरह समाधान लिखें:

  • आप स्पार्क कैप्सूलीकरण तोड़ रहे हैं और आप समय पास के रूप में पूरी तरह से समर्थन करने के लिए होता है कि एक कस्टम वर्ग बनाने।

  • मेरा मानना ​​है कि इस तरह का समाधान अंतिम उपाय है। यदि आपका उपयोग केस अलग-अलग तरीके से कार्यान्वित किया जा सकता है, तो आमतौर पर इस तरह के समाधान से बचने के लिए बेहतर होता है।

  • यदि आपके पास एस 3 पर बहुत से "उपनिर्देशिकाएं" होंगी और उनमें से प्रत्येक की जांच होगी तो यह आपको खर्च करेगा।

  • यह समझना बहुत दिलचस्प होगा कि क्या डाटाबेस संभव प्रदर्शन दंड के कारण नेस्टेड फ़ाइलों का समर्थन नहीं करता है या नहीं, शायद एक गहरा कारण है जिसके बारे में मैंने सोचा नहीं है।

+0

मेरे पास एक समान उपयोग केस है और यदि मुझे वैकल्पिक नहीं मिलता है तो मैं इस पथ को नीचे जाने की सोच रहा हूं। मेरे पास दिनांक YYYY-MM-DD-HH प्रारूप का उपयोग करके विभाजित उप फ़ोल्डर हैं। हर घंटे एक नया फ़ोल्डर बनाया जाता है और इसमें फाइलें अपलोड की जाती हैं। तो मुझे जरूरी नहीं कि सभी सबफ़ोल्डर (केवल अंतिम तीन) स्कैन करना होगा और प्रदर्शन समस्याओं को नहीं दबाएगा। मैं पुन: प्रारंभ करने के लिए ऐसे कोड और राज्य प्रबंधन की रखरखाव के बारे में अधिक चिंतित हूं (कौन सा घंटा फ़ोल्डर + फ़ाइल अंतिम स्कैन किया गया था, आदि)।देखें कि क्या आप इस पर अपने विचार साझा कर सकते हैं या यहां तक ​​कि कोड जो आपके कस्टम फ़ाइलडिस्ट्रीम के लिए काम करता है। – Cheeko

+0

यदि आप अपनी स्ट्रीम के साथ चेकपॉइंट निर्देशिका का उपयोग कर रहे हैं, तो जब आप एप्लिकेशन को पुनरारंभ करते हैं तो आप पहले उन सभी बैचों को फिर से निर्धारित कर देंगे जो एप्लिकेशन डाउन टाइम के दौरान निष्पादित होनी चाहिए थीं। उदाहरण के लिए यदि आपका स्ट्रीमिंग अंतराल 1 मिनट है, और आपका आवेदन 10:00 बजे नीचे जा रहा है और 10:30 बजे बैक अप हो रहा है, तो जब यह ऐप होगा तो ऐप 10:01, 10:02 इत्यादि को बैच निष्पादित करने का प्रयास करेगा। अब यदि आप एक खोजNewFiles (currentTime) को इस तरह से कार्यान्वित करते हैं कि आपके द्वारा स्कैन किए गए फ़ोल्डर वर्तमान समय से व्युत्पन्न होते हैं, तो आप पुनरारंभ करने के बाद "दाएं" फ़ाइलों को स्कैन करने में सक्षम होंगे। –

+0

कृपया ध्यान दें कि वर्तमान समय वास्तव में वर्तमान समय नहीं है, यह बैच का समय है। एकमात्र समस्या मैं उस दृष्टिकोण में सोच सकता हूं कि यदि आपकी फ़ाइलें अपरिवर्तनीय नहीं हैं। उदाहरण के लिए आप 10:10 पर फ़ाइल ए को फ़ाइल करने के लिए कुछ डेटा लिखते हैं और 10:20 पर इस डेटा को ओवरराइट करते हैं, तो यदि आपका आवेदन 10: 10-10: 20 के बीच था, तो आप ए को पहला लेखन खो देंगे। यह वास्तव में एक समस्या है, लेकिन मैं ऐसे कई परिस्थितियों से परिचित नहीं हूं जो ऐसे परिदृश्यों में परिवर्तनीय फाइलों के साथ काम करते हैं। –

संबंधित मुद्दे