2015-09-17 9 views
5

मैं अद्वितीय उपयोगकर्ताओं की गिनती के लिए स्पार्क स्ट्रीमिंग का उपयोग कर रहा हूं। मैं updateStateByKey का उपयोग करता हूं, इसलिए मुझे एक चेकपॉइंट निर्देशिका कॉन्फ़िगर करने की आवश्यकता है। मैं भी डेटा चौकी से, जबकि आवेदन शुरू, the example in the doc के रूप में लोड:स्पार्क स्ट्रीमिंग एप्लिकेशन को फिर से तैनात करने के लिए चेकपॉइंट को कॉन्फ़िगर कैसे करें?

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

यहाँ सवाल है, अगर मेरे कोड बदल दिया गया है, तो मैं फिर से तैनात कोड, चौकी लोड किया जाएगा कोई कितना भी है कोड बदल गया है? या मुझे अपने डेटा को दृढ़ता से रखने और अगले भाग में लोड करने के लिए अपने तर्क का उपयोग करने की आवश्यकता है।

यदि मैं डीस्ट्रीम को सहेजने और लोड करने के लिए अपने तर्क का उपयोग करता हूं, तो यदि अनुप्रयोग विफलता पर पुनरारंभ होता है, तो क्या डेटा चेकपॉइंट निर्देशिका और मेरे डेटाबेस से लोड नहीं होगा?

उत्तर

3

चेकपॉइंट में आपके मेटाडेटा, आरडीडी, डैग और यहां तक ​​कि आपका तर्क भी शामिल है। अगर आप अपना तर्क बदलते हैं और इसे अंतिम चेकपॉइंट से चलाने का प्रयास करते हैं, तो आपको अपवाद को मारने की संभावना है। यदि आप अपने डेटा को कहीं भी चेकपॉइंट के रूप में सहेजने के लिए अपने तर्क का उपयोग करना चाहते हैं, तो आपको अगले चेक में अपने चेकपॉइंट डेटा को जो भी डेटाबेस पर धक्का देने के लिए स्पार्क एक्शन लागू करना पड़ सकता है, चेकपॉइंट डेटा को प्रारंभिक आरडीडी के रूप में लोड करें (मामले में आप UpdateStateByKey API का उपयोग कर रहे हैं) और अपना तर्क जारी रखें।

2

मैंने स्पार्क मेल सूची में यह प्रश्न पूछा है और मुझे जवाब मिला है, मैंने इसका विश्लेषण my blog पर किया है। मैं यहां संक्षेप में पोस्ट करूंगा:

चेकपॉइंटिंग और हमारे डेटा लोडिंग तंत्र दोनों का उपयोग करना है। लेकिन हम updateStateByKey के initalRDD के रूप में अपना डेटा लोड करते हैं। तो दोनों स्थितियों में, डेटा खो जाएगा न है और न ही नकल:

  1. जब हम कोड बदल सकते हैं और स्पार्क आवेदन, हम बंद पुराने स्पार्क आवेदन शान से और सफाई चौकी डेटा पुनर्वितरित, इसलिए केवल लोड डेटा है हमारे द्वारा सहेजा गया डेटा।

  2. जब स्पार्क एप्लिकेशन विफलता और पुनरारंभ होता है, तो यह डेटा को चेकपॉइंट से लोड करेगा। लेकिन डीएजी का कदम बचाया गया है, इसलिए यह हमारे डेटा को initalRDD के रूप में फिर से लोड नहीं करेगा। तो केवल लोड किया गया डेटा चेकपॉइंट डेटा है।

संबंधित मुद्दे