2015-10-21 17 views
7

में अनकॉच अपवाद हैंडलिंग मैं जावा आधारित स्पार्क स्ट्रीमिंग एप्लिकेशन पर काम कर रहा हूं जो एक कफका विषय के माध्यम से आने वाले संदेशों का जवाब देता है। प्रत्येक संदेश के लिए, एप्लिकेशन कुछ प्रसंस्करण करता है, और परिणामों को एक अलग काफ्का विषय पर वापस लिखता है।स्पार्क

कभी-कभी अप्रत्याशित डेटा से संबंधित मुद्दों के कारण, आरडीडी पर चलने वाला कोड असफल हो सकता है और अपवाद फेंक सकता है। जब ऐसा होता है, तो मैं एक सामान्य हैंडलर रखना चाहता हूं जो आवश्यक कार्रवाई कर सके और एक त्रुटि विषय को संदेश छोड़ दे। अभी, इन अपवादों को स्पार्क के लॉग में स्पार्क द्वारा स्वयं लिखा गया है।

आरडीडी पर काम करने वाले प्रत्येक कोड ब्लॉक के लिए प्रयास-पकड़ ब्लॉक लिखने के बजाय ऐसा करने का सबसे अच्छा तरीका क्या है?

+0

मुझे लगता है कि किसी एक करीबी वोट इस सवाल कह casted है राय-आधारित है। यदि विशेषज्ञ स्पार्क के साथ अब संभव नहीं है तो विशेषज्ञों ने करीबी वोट डालने से पहले कम से कम कुछ प्रकाश डाला, तो मैं सराहना करता हूं। एक स्पष्टीकरण के बिना एक करीबी वोट कास्टिंग समुदाय को किसी भी तरह से मदद नहीं करता है। –

+0

आप एक सामान्य कार्य लिख सकते हैं जो यह करता है। आपको केवल इसे आरडीडी कार्यों के आसपास लपेटने की ज़रूरत है क्योंकि वे केवल वे हैं जो स्पार्क अपवादों को फेंक सकते हैं (ट्रांसफॉर्मर जैसे .map और .filter क्रियाओं द्वारा आलसी आलसी हैं)। (मान लीजिए कि यह स्कैला में है) आप शायद इंपीसिट्स के साथ कुछ भी कोशिश कर सकते हैं और एक समेकित आरडीडी कक्षा को संभालने में त्रुटि कर सकते हैं जो आप केवल अपने हस्ताक्षर के साथ अपने त्रुटिरोध को लागू करने के लिए बनाते हैं। मैंने करीबी वोट नहीं बनाया, लेकिन मुझे लगता है कि "सर्वश्रेष्ठ" दृष्टिकोण कुछ हद तक आवेदन आवश्यकताओं के लिए व्यक्तिपरक है। – Rich

+0

धन्यवाद @Rich। तो मूल रूप से आप जो कहना चाहते हैं वह यह है कि अब स्पार्क में इसे संभालने के लिए कोई रास्ता नहीं है, इसलिए प्रत्येक एप्लिकेशन को इसका ख्याल रखना चाहिए। यदि आप अपनी टिप्पणी को उत्तर के रूप में पोस्ट कर सकते हैं, तो मैं इसे स्वीकार करूंगा। –

उत्तर

3

आप एक सामान्य कार्य लिख सकते हैं जो यह करता है। आपको केवल इसे आरडीडी कार्यों के आसपास लपेटने की ज़रूरत है क्योंकि वे केवल एकमात्र हैं जो स्पार्क अपवादों को फेंक सकते हैं (ट्रांसफॉर्मर जैसे .map और .filter क्रियाओं द्वारा आलसी निष्पादित हैं)।

(मान लीजिए कि यह स्कैला में है) आप शायद इम्प्लिकेट्स के साथ कुछ भी कोशिश कर सकते हैं। एक कक्षा बनाएं जो आरडीडी रखती है और त्रुटि को संभालती है। यहां उसका कैसा लग सकता है की एक संक्षिप्त वर्णन:

implicit class FailSafeRDD[T](rdd: RDD[T]) { 
    def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try { 
    fn(rdd) 
    } 
} 

आप failsafeAction या कुछ में त्रुटि विषय संदेश सेवा जोड़ सकता है आप विफलता पर हर बार करना चाहते हैं। और फिर उपयोग इस तरह हो सकता है:

val rdd = ??? // Some rdd you already have 
val resultOrException = rdd.failsafeAction { r => r.count() } 

इसके अलावा, मुझे लगता है कि "सर्वश्रेष्ठ" दृष्टिकोण कुछ हद तक आवेदन आवश्यकताओं के लिए व्यक्तिपरक है।

2

मुझे लगता है कि आप भी एक कोशिश पकड़ के साथ इस को लागू कर सकता है =>

dstream.foreachRDD { case rdd: RDD[String] => 
    rdd.foreach { case string: String => 
     try { 
     val kafkaProducer = ... 
     val msg = ... 
     kafkaProducer.send(msg) 
     } catch { 
     case d: DataException=> 
      val kafkaErrorProducer = ... 
      val errorMsg = ... 
      kafkaErrorProducer.send(errorMsg) 
     case t: Throwable => 
      //further error handling 
     } 
    } 
}