2017-09-26 14 views
7

में स्पार्कएक्सप्शन फेंकते समय मैं ईएमआर में संसाधन प्रबंधक के रूप में और 2 नोड्स पर एक स्पार्क नौकरी चला रहा हूं। अगर मेरी हालत पूरी नहीं हुई है, तो मुझे चरणबद्ध रूप से चरण में विफल होने की आवश्यकता है, इसलिए अगला चरण कॉन्फ़िगरेशन के अनुसार निष्पादित नहीं होता है। मैं एक कस्टम अपवाद फेंक रहा हूँ, dynamoDB में एक लॉग संदेश डालने के बाद इस लक्ष्य को हासिल करने के लिए।स्पार्क, गलत व्यवहार EMR

यह ठीक चलता है लेकिन डायनेमो में रिकॉर्ड दो बार डाला जा रहा है।

नीचे मेरा कोड है।

if(<condition>) { 
    <method call to insert in dynamo> 
    throw new SparkException(<msg>); 
    return; 
} 

अगर मैं अपवाद फेंक लाइन निकालने के लिए, यह ठीक काम करता है, लेकिन चरण पूरा कर लिया है।

लॉग संदेश दो बार बिना, मैं चरण कैसे विफल कर सकता हूं।

सहायता के लिए धन्यवाद।

सादर, Sorabh

उत्तर

2

शायद कारण अपने डाइनेमो संदेश डाला गया था दो बार था, क्योंकि आपके त्रुटि स्थिति मारा और दो अलग अलग निष्पादकों द्वारा संसाधित किया गया था। स्पार्क अपने कर्मचारियों के बीच किए जाने वाले कार्यों को विभाजित कर रहा है, और वे कर्मचारी कोई ज्ञान साझा नहीं करते हैं।

मुझे यकीन नहीं है कि स्पार्क चरण विफल होने के लिए आपकी आवश्यकता क्या चल रही है, लेकिन मैं सुझाव देता हूं कि स्पार्क मरने की कोशिश करने के बजाय आपके आवेदन कोड में विफलता केस को ट्रैक करने के बजाय। दूसरे शब्दों में, त्रुटि लिखें जो त्रुटि का पता लगाती है और वापस आपके स्पार्क ड्राइवर को पास करती है, फिर उचित पर कार्य करें।

ऐसा करने का एक तरीका यह होगा कि आप अपने डेटा को संसाधित करते समय होने वाली किसी भी त्रुटि को गिनने के लिए एक संचयक का उपयोग करें। यह (मैं स्केला और DataFrames संभालने हूँ, लेकिन आप आवश्यकतानुसार RDD और/या अजगर को अनुकूलित कर सकते हैं) मोटे तौर पर कुछ इस तरह दिखेगा:

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

इस दृष्टिकोण के बारे में एक अच्छी बात है कि आप देख रहे हैं प्रतिक्रिया के लिए स्पार्क यूआई में आप चलते समय संचयक मूल्यों को देख पाएंगे। संदर्भ के लिए, यहां जमाकर्ताओं पर प्रलेखन है: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

संबंधित मुद्दे