2017-02-02 22 views
5

चिंगारी संरचित स्ट्रीमिंग का प्रलेखन से: "यह चौकी स्थान एक HDFS संगत फाइल सिस्टम में एक रास्ता हो गया है, और जब एक शुरू करने DataStreamWriter में एक विकल्प के रूप में स्थापित किया जा सकता है क्वेरी। "अपाचे स्पार्क (स्ट्रीमिंग संरचित): S3 चेकप्वाइंट समर्थन

और यकीन है कि पर्याप्त, एक S3 पथ के लिए चौकी स्थापित करने फेंकता है:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
     at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
     at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
     at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

सवालों की एक जोड़ी यहाँ:

  1. (क्यों एक चौकी dir रूप में समर्थित नहीं S3 है नियमित रूप से चिंगारी स्ट्रीमिंग का समर्थन इस)? एक फाइल सिस्टम "एचडीएफएस अनुपालन" बनाता है?
  2. मैं एचडीएफएस का उपयोग करता हूं (चूंकि क्लस्टर हर समय ऊपर या नीचे आ सकते हैं) और एस 3 का उपयोग सभी डेटा को जारी रखने के लिए स्थान के रूप में करते हैं - इस तरह के सेटअप में संरचित स्ट्रीमिंग डेटा के लिए चेकपॉइंटिंग डेटा संग्रहीत करने के लिए सिफारिशें क्या होंगी?
+0

शुद्ध अनुमान यहां है लेकिन आपने एस 3 एन या एस 3 ए (अधिमानतः एस 3 ए) प्रोटोकॉल की कोशिश की है? – ImDarrenG

+0

निश्चित रूप से लायक और प्रयास, इसे आजमाएगा। – Apurva

उत्तर

2

एफएस एचडीएफएस "अनुपालन" क्या बनाता है? यह Hadoop FS specification में निर्दिष्ट व्यवहारों के साथ एक फ़ाइल सिस्टम है। एक वस्तु की दुकान और एफएस के बीच अंतर देखते प्रमुख मुद्दा होने के साथ कवर किया जाता है, "संलग्न या ओ के बिना अंत में लगातार वस्तु भंडार (1) परमाणु renames अनुरूप नहीं हैं"

विशेष रूप से

S3 के लिए

  1. यह संगत नहीं: एक नया ब्लॉब बनने के बाद, एक सूची कमांड अक्सर इसे नहीं दिखाता है। हटाने के लिए वही।
  2. जब एक ब्लॉब ओवरराइट या नष्ट कर दिया जाता है, यह कुछ समय लग सकता दूर जाना
  3. नाम बदलने() की नकल द्वारा कार्यान्वित किया जाता है और फिर एक स्थान के लिए सब कुछ सहेजने और फिर नाम बदलकर

स्पार्क स्ट्रीमिंग चौकियों को नष्ट यह चेकपॉइंट निर्देशिका में है। यह S3 में डेटा की एक प्रति करने के लिए आनुपातिक चेकआउट करने का समय बनाता है, जो ~ 6-10 एमबी/एस है।

स्ट्रीमिंग कोड का वर्तमान बिट एस 3 के लिए उपयुक्त नहीं है, और कुछ बिंदु पर मैं इसे ठीक कर सकता हूं, मैं अपने पुराने लोगों को तब तक स्पार्क करने के लिए कोई नया पैच सबमिट नहीं कर रहा हूं। इस सामान पर काम करने का कोई मतलब नहीं है अगर इसे केवल अनदेखा किया जा रहा है।

अभी के लिए

, HDFS को

  • चौकी में से एक है और फिर परिणाम
  • EBS का एक सा करने के लिए चौकी आवंटित पर कॉपी करें और अपने क्लस्टर S3 के लिए
  • चौकी से जुड़ी है, लेकिन एक है चेकपॉइंट्स के बीच लंबा अंतर ताकि चेकपॉइंट का समय आपके स्ट्रीमिंग ऐप को नीचे न लाए।

यदि आप ईएमआर का उपयोग कर रहे हैं, तो आप एक सतत, डायनेमो डीबी समर्थित एस 3 के लिए प्रीमियम का भुगतान कर सकते हैं, जो आपको बेहतर स्थिरता देता है।लेकिन कॉपी समय अभी भी वही है, इसलिए चेकपॉइंटिंग धीमी गति से

+0

हमारे पास एस 3 के चेकपॉइंट्स के बीच 40 सेकंड अंतराल है और हम अभी भी कभी-कभी चेकपॉइंटिंग समस्याएं जैसे कि temp निर्देशिका को लिखा जा रहा है और फिर नहीं मिला है। –

+0

चेकपॉइंट नहीं मिला है शायद एस 3 की स्थिरता सतह पर है: लिस्टिंग ऑब्जेक्ट स्टोर में बदलावों को बदलती है। आम तौर पर आप ध्यान नहीं देते हैं, लेकिन कभी-कभी यह सतहों पर होता है। मेटाडेटा स्टोर के लिए डायनेमो का उपयोग करना चाहिए ... कम से कम अगर ऐसा नहीं होता है, तो यह गलत तरीके से कार्यान्वित किया जा रहा है –

4

यह एक ज्ञात मुद्दा है: https://issues.apache.org/jira/browse/SPARK-19407

अगली फिल्म में तय किया जाना चाहिए। आप वर्कअराउंड के रूप में --conf spark.hadoop.fs.defaultFS=s3 का उपयोग कर डिफ़ॉल्ट फ़ाइल सिस्टम को s3 पर सेट कर सकते हैं।

+0

ऐसा नहीं लगता कि यह अभी तक हल हो गया है। एस 3 (स्पार्क 2.1.1) पर संरचित स्ट्रीमिंग की जांच करने में अभी भी असमर्थ। 7/06/29 00:29:00 जानकारी StateStoreCoordinatorRef: पंजीकृत StateStoreCoordinator endpoint org.apache.spark.sql.AnalysisException: चौकी वसूली के साथ विफल रहता है इस क्वेरी चौकी स्थान से उबरने का समर्थन नहीं करता। – Apurva

+0

यह एक अलग मुद्दा है। क्या आप "मेमोरी" या "कंसोल" का उपयोग कर रहे हैं जो वसूली का समर्थन नहीं करता है? – zsxwing

2

यह समस्या https://issues.apache.org/jira/browse/SPARK-19407 में तय की गई है।

हालांकि एस 3 में अंतिम स्थिरता की कमी के कारण संरचित स्ट्रीमिंग चेकपॉइंटिंग एस 3 में अच्छी तरह से काम नहीं करती है। https://issues.apache.org/jira/browse/SPARK-19013 चेकपॉइंटिंग के लिए S3 का उपयोग करना अच्छा नहीं है।

माइकल आर्बर्स्ट ने कहा है कि यह स्पार्क में तय नहीं किया जाएगा, और समाधान एस 3गार्ड को लागू करने के लिए इंतजार करना है। S3Guard कुछ समय दूर है।

संबंधित मुद्दे