2015-04-27 9 views
30

मैं समझने की कोशिश कर रहा हूं कि स्पार्क का कैश कैसे काम करता है।स्पार्क के कैशिंग को समझना

यहाँ मेरी भोली समझ है, मुझे पता है कि अगर मैं कुछ याद कर रहा हूँ तो कृपया:

val rdd1 = sc.textFile("some data") 
rdd1.cache() //marks rdd1 as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

ऊपर, rdd1 में केवल एक बार डिस्क (जैसे HDFS) से लोड किया जाएगा। (जब rdd2 बचाया जाता है तो मुझे लगता है) और फिर कैश से (मान लीजिए कि पर्याप्त रैम है) जब rdd3 बचाया जाता है)

अब मेरा प्रश्न है। मान लीजिए कि मैं rdd2 और rdd3 को कैश करना चाहता हूं क्योंकि दोनों का बाद में उपयोग किया जाएगा, लेकिन मुझे बनाने के बाद मुझे rdd1 की आवश्यकता नहीं है।

असल में नकल है, है ना? चूंकि एक बार rdd2 और rdd3 की गणना की जाती है, मुझे अब rdd1 की आवश्यकता नहीं है, मुझे शायद इसे अनपेक्षित करना चाहिए, है ना? सवाल कब है?

क्या यह काम करेगा? (विकल्प A)

val rdd1 = sc.textFile("some data") 
rdd1.cache() // marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.cache() 
rdd3.cache() 
rdd1.unpersist() 

चिंगारी DAG को unpersist कॉल होता है? या यह तुरंत किया जाता है? अगर यह तुरंत किया जाता है, तो जब मैं rdd2 और rdd3 से पढ़ता हूं, तो मूल रूप से rdd1 गैर कैश किया जाएगा, है ना?

क्या मुझे इसके बजाय ऐसा करना चाहिए (विकल्प बी)?

val rdd1 = sc.textFile("some data") 
rdd1.cache() // marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 

rdd2.cache() 
rdd3.cache() 

rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

rdd1.unpersist() 

तो सवाल यह है: विकल्प पर्याप्त एक अच्छा है? यानी rdd1 अभी भी एक बार फ़ाइल लोड करेगा? या मुझे विकल्प बी के साथ जाने की ज़रूरत है?

उत्तर

19

ऐसा लगता है कि विकल्प बी आवश्यक है। कारण स्पार्क द्वारा कैसे जारी/कैश और unpersist निष्पादित किया जाता है से संबंधित है। चूंकि आरडीडी परिवर्तन केवल निष्पादन के बिना डीएजी विवरण बनाते हैं, जब आप एस्पर्सिस्ट को कॉल करते समय विकल्प ए में, तब भी आपके पास केवल नौकरी के विवरण होते हैं और चल रहे निष्पादन नहीं होते हैं।

यह प्रासंगिक है क्योंकि cache या persist कॉल केवल आरडीडी को आरडीडी के मानचित्र में जोड़ता है जो खुद को नौकरी निष्पादन के दौरान जारी रखने के लिए चिह्नित करता है। हालांकि, unpersist सीधे ब्लॉक प्रबंधक को भंडारण से आरडीडी को बेदखल करने के लिए कहता है और लगातार आरडीडी के मानचित्र में संदर्भ को हटा देता है।

persist function

unpersist function

तो तुम स्पार्क वास्तव में मार डाला और ब्लॉक प्रबंधक के साथ RDD संग्रहीत के बाद unpersist कॉल करने के लिए की आवश्यकता होगी।

इस दिशा में RDD.persist विधि संकेत के लिए टिप्पणियाँ: rdd.persist

+1

हां, लगता है कि आप पर कर रहे हैं यह। यह थोड़ा दुर्भाग्यपूर्ण है, मेरी इच्छा है कि "कैश" को डीएजी ऑपरेशन में परिवर्तित कर दिया गया हो और न केवल आरडीडी आईडी को मानचित्र में जोड़ें ... ऐसे कई मामले हैं जहां आप कुछ मध्यस्थता से कैश करना चाहते हैं, एक नया आरडीडी बनाएं, तो पुराना छोड़ दो। शायद अच्छे सैद्धांतिक कारण हैं कि यह क्यों एक अच्छा विचार नहीं है ... किसी भी मामले में, एलआरयू (मुझे लगता है) कैश का ऑर्डर करने का मतलब है कि पुराने अप्रयुक्त आरडी 1 को निकाल दिया जाएगा यदि rdd2 और rdd3 को कैशिंग के लिए उस स्थान की आवश्यकता है ... –

+0

इसलिए मैंने ज्यादातर देखा कि क्या लगातार/कैश और असिस्टिस्ट कर रहे हैं, लेकिन स्पार्क क्या कर रहा है, इस पर विचार करने के लिए अभी भी एक जगह है कि आप किसी अन्य से आरडीडी प्राप्त करते हैं और यह किसी भी तरह से अनुकूलित कैसे हो सकता है। मुझे यकीन नहीं है कि 'rdd1' को कैश करने की भी आवश्यकता है, इसे कैश किए जाने पर या डीएजी पाइपलाइन होने पर' rdd2' और 'rdd3' द्वारा चेकपॉइंट किया जा सकता है। हालांकि यह मेरे लिए एक भूरे रंग का क्षेत्र है। – Rich

+2

डीबगर के माध्यम से थोड़ी अधिक जांच और ट्रेसिंग किया। 'rdd2' और' rdd3' निर्भरता के रूप में 'rdd1' को संदर्भित करेगा। 'rdd1' पहले क्रिया पर निष्पादित एक बार विभाजन में अपने डेटा को लोड करेगा। अब इस बिंदु पर 'rdd2' और' rdd3' दोनों विभाजन में 'rdd1' द्वारा पहले से लोड किए गए डेटा में उनके परिवर्तन लागू करते हैं। मेरा मानना ​​है कि यदि आप एक ही सटीक आरडीडी पर कई कार्रवाइयां चलाते हैं तो कैशिंग मूल्य प्रदान करती है, लेकिन नई शाखाओं के आरडीडी के मामले में मुझे नहीं लगता कि आप एक ही मुद्दे में भाग लेते हैं क्योंकि मेरा मानना ​​है कि स्पार्क को पता है कि 'rdd1' अभी भी एक है पहली बचत के बाद 'rdd3' के लिए निर्भरता। – Rich

2

विकल्प एक में, आप नहीं दिखाया जब आप कार्रवाई बुला रहे हैं (बचाने के लिए कॉल)

val rdd1 = sc.textFile("some data") 
rdd.cache() //marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.cache() 
rdd3.cache() 
rdd1.unpersist() 
rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

अनुक्रम है उपर्युक्त के रूप में, विकल्प ए को rdd2 और rdd 3

+0

यह मुझे सहमत होना चाहिए, लेकिन यह होगा? मुझे लगता है कि ऐसा नहीं होगा, जब आप rdd2.saveAsTestFile आदि को कॉल करते हैं, rdd1 पहले से ही जारी नहीं है। persist/unpersist DAG –

+0

पर नहीं है जब तक कि आप सेवफाइल को कॉल न करें, कुछ भी नहीं ** वास्तव में ** होता है ..... तो मेरा बिंदु rdd1.unpersist कॉल का आदेश कोई फर्क नहीं पड़ता कि rdd2 पहले से कैश किया गया है –

0

विकल्प बी की गणना करने के लिए rdd1 के कैश किए गए संस्करण का उपयोग करना चाहिए विकल्प बी छोटे tweak-in के साथ एक इष्टतम दृष्टिकोण है। कम महंगी कार्रवाई विधियों का उपयोग करें। आपके कोड द्वारा उल्लिखित दृष्टिकोण में, saveAsTextFile एक महंगी ऑपरेशन है, इसे गिनती विधि द्वारा प्रतिस्थापित करें।

आइडिया यहाँ DAG से बड़ा rdd1 दूर करने के लिए, अगर यह प्रासंगिक नहीं है आगे गणना के लिए (के बाद rdd2 और rdd3 बनाई गई हैं)

कोड से

दृष्टिकोण अपडेट किया गया

val rdd1 = sc.textFile("some data").cache() 
val rdd2 = rdd1.filter(...).cache() 
val rdd3 = rdd1.map(...).cache() 

rdd2.count 
rdd3.count 

rdd1.unpersist() 
संबंधित मुद्दे