2015-07-19 13 views
12

मैं समझने की कोशिश कर रहा हूं कि स्पार्क स्ट्रीमिंग ऐप को अधिक गलती टोलरेंट (विशेष रूप से डाउनस्ट्रीम निर्भरताओं को लिखने का प्रयास करते समय) कैसे बनाना है, और मुझे नहीं पता कि सबसे अच्छा तरीका क्या है कैसंड्रा, डायनेमो डीबी, आदि जैसे बाहरी स्रोतों को परिणाम लिखने की कोशिश में असफलताओं को संभालने के लिए हैअपाचे स्पार्क स्ट्रीमिंग, डाउनस्ट्रीम निर्भरता विफलताओं को कैसे संभालें

उदाहरण के लिए, मेरे पास स्पार्क स्ट्रीमिंग नौकरी है जो स्ट्रीम (काफ्का, फ्ल्यूम इत्यादि ... से डेटा खींचती है ' टी ने अभी तक किस तकनीक का उपयोग करने के लिए अंतिम रूप दिया है), समान वस्तुओं को एक साथ जोड़ता है, और फिर परिणामों को बाहरी स्टोर में लिखता है। (यानी कैसंड्रा, डायनेमो डीबी, या जो भी मेरे डीस्ट्रीम कंप्यूटेशंस के परिणाम प्राप्त कर रहा है)।

मैं यह समझने की कोशिश कर रहा हूं कि मैं उस मामले को कैसे संभाला हूं जहां बाहरी निर्भरता लिखने के लिए उपलब्ध नहीं है। शायद क्लस्टर नीचे चला गया, शायद अनुमति समस्याएं हैं, आदि, लेकिन मेरा काम बाहरी निर्भरता के परिणाम नहीं लिख सकता है। स्पार्क स्ट्रीमिंग को रोकने का कोई तरीका है ताकि रिसीवर डेटा बैच जारी नहीं रख सकें? क्या मुझे बस वर्तमान बैच सोना चाहिए और रिसीवर बैचों को स्टोर करना जारी रखेगा? यदि समस्या क्षणिक (कुछ सेकंड) है, तो बैच जारी रखना स्वीकार्य हो सकता है, लेकिन क्या होता है यदि निर्भरता कुछ मिनट या 1+ घंटे के लिए नीचे जाती है?

एक विचार था कि मेरे पास एक मॉनिटर प्रक्रिया थी जो पृष्ठभूमि में निर्भरताओं के स्वास्थ्य को देखती है, और यदि यह पता चलता है कि यह "अस्वास्थ्यकर" है, तो यह नौकरी को रोक देगा। फिर, जब सभी निर्भरता स्वस्थ होती है, तो मैं नौकरी का बैक अप ले सकता हूं और बाहरी डेटा पर लिखे गए सभी डेटा को संसाधित नहीं कर सकता।

एक और विचार था कि मैं किसी भी तरह से DStream forEachRdd विधि में सिग्नल करना था, कि कोई समस्या थी। क्या कोई अपवाद है कि मैं डीस्ट्रीम में फेंक सकता हूं जो ड्राइवर को वापस संकेत देगा कि इसे रोकना चाहिए?

अगर किसी को बाहरी गलती सहनशीलता को संभालने के तरीके पर कोई अनुभव है, या मुझे अच्छे लेख/वीडियो पर इंगित कर सकते हैं, तो यह बहुत अच्छा होगा।

धन्यवाद

+2

मुझे यकीन नहीं है, लेकिन डाउनस्ट्रीम डेटा रिसीवर/स्टोरेज को विफलताओं को स्वयं संभालना चाहिए? स्पार्क की ज़िम्मेदारी इसके बारे में चिंता करने की ज़िम्मेदारी से परे है। अगर विफलता होती है तो यह निगरानी और चेतावनी के बारे में अधिक है, ताकि इंजीनियरों को अधिसूचित किया जा सके और तुरंत विफलता की जांच की जा सके। – keypoint

उत्तर

1

मेरा मानना ​​है कि यहां कोई सरल और सार्वभौमिक उत्तर नहीं है। आवेदन अर्थशास्त्र, डेटा स्रोतों के प्रकार (विश्वसनीय रिसीवर, विश्वसनीय रिसीवर, फ़ाइल आधारित, रिसीवर-कम) और आवश्यकताओं पर निर्भर करता है।

सामान्य रूप से आपको किसी एकल आईओ विफलता पर एप्लिकेशन को कभी विफल नहीं होने देना चाहिए। मान लें कि आपके पास कुछ कार्यवाही है:

outputAction[T](rdd: RDD[T]): Unit = ??? 

कम से कम सुनिश्चित करें कि यह आपके ड्राइवर के अपवाद का प्रचार नहीं करेगा।

outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ??? 

stream foreachRDD { rdd => Try(outputAction(rdd)) } 

प्रश्न अगला क्या है। दी गई विंडो को छोड़ना सबसे आसान काम है। आवेदन के आधार पर यह स्वीकार्य समाधान हो सकता है या नहीं, लेकिन आम तौर पर ऐसे कई मामले हैं जहां कुछ डेटा खोना पूरी तरह से स्वीकार्य है।

विफलताओं का ट्रैक रखने और कुछ सीमा लेने पर कुछ और कार्रवाई करने के द्वारा इसे और बेहतर किया जा सकता है।

तो डेटा छोड़ने स्वीकार्य नहीं है अगले कदम के कुछ विलंब के बाद पुन: प्रयास करने के लिए है:

outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ??? 

stream foreachRDD { 
    rdd => Try(outputAction(rdd)) 
    .recoverWith { case _ => Try(outputActionWithDelay(d1)(rdd)) } 
    .recoverWith { case _ => Try(outputActionWithDelay(d2)(rdd)) } 
    ... 
} 

पुनर्प्रयास की संख्या और देरी अवधि मामले से स्रोत है और आने वाले डाटा स्टोर करने की क्षमता पर मामला और depnds लिए अलग अलग होंगे।

हम पिछली बार पुनः प्रयास करते समय क्या कर सकते हैं? शुरुआत के लिए हम एक वैकल्पिक आउटपुट स्रोत जोड़ सकते हैं। प्राथमिक स्रोत का उपयोग करने के बजाय आप उदाहरण के लिए सब कुछ विश्वसनीय बाहरी फ़ाइल भंडारण में धक्का दे सकते हैं और बाद में इसके बारे में चिंता कर सकते हैं। यह लागू नहीं हो सकता है अगर आउटपुट स्रोत को आने वाले डेटा के विशिष्ट क्रम की आवश्यकता होती है लेकिन अन्यथा प्रयास करने योग्य होना चाहिए।

alternativeOutputAction[T](rdd: RDD[T]) = ??? 

stream foreachRDD { 
    rdd => Try(outputAction(rdd)) 
    .recoverWith { case _ => Try(outputActionWithDelay(d1) 
    ... 
    .recoverWith { case _ => Try(outputActionWithDelay(dn)(rdd)) } 
    .recoverWith { case _ => Try(alternativeOutputAction(rdd)) 
} 

यदि यह विफल हो जाता है तो यह शायद गंभीर समस्याओं का एक लक्षण है और आवेदन स्तर पर हम इतना कुछ नहीं कर सकते हैं। हम पहले दृष्टिकोण पर वापस जा सकते हैं और बस आशा करते हैं कि स्थिति जल्द ही हल हो जाएगी या अधिक परिष्कृत दृष्टिकोण चुनें।

यदि इनपुट स्रोत डेटा को बफर कर सकता है और हम विश्वसनीय भंडारण और प्रतिकृति का उपयोग करते हैं तो हम enable checkpointing कर सकते हैं और बस एप्लिकेशन को मार सकते हैं।

यदि आप इसे पुनर्प्राप्त करने का प्रयास करते हैं तो शायद CircuitBreaker के कुछ संस्करण को जोड़ने का एक अच्छा विचार है और यदि एप्लिकेशन को देरी के बिना प्राथमिक आउटपुट ड्रॉप रिकवरी प्रयासों तक पहुंचने की कोशिश करने में कई असफलताओं का सामना करना पड़ता है।

1

अब मैं प्रत्यक्ष धारा का उपयोग करता हूं और अपने आप से ऑफसेट बचाता हूं। इससे आपकी समस्या हल नहीं हो सकती है, कम से कम एक बार जब आपको अपने बाहरी भंडारण के साथ कुछ समस्याएं मिलती हैं, तो आप कहां से रुक सकते हैं।

संबंधित मुद्दे