में सभी डेटाफ्रेम को लगातार बनाए रखना मैं कई बिंदुओं के साथ एक स्पार्क एप्लिकेशन हूं जहां मैं वर्तमान स्थिति को जारी रखना चाहता हूं। यह आमतौर पर एक बड़े कदम के बाद होता है, या एक राज्य को कैशिंग करता है जिसे मैं कई बार उपयोग करना चाहता हूं। ऐसा प्रतीत होता है कि जब मैं दूसरी बार अपने डेटाफ्रेम पर कैश कहता हूं, तो एक नई प्रति स्मृति में कैश की जाती है। मेरे आवेदन में, स्केलिंग करते समय यह स्मृति समस्याओं को जन्म देता है। हालांकि, मेरे वर्तमान परीक्षणों में एक दिया गया डेटाफ्रेम अधिकतम 100 एमबी है, मध्यवर्ती परिणामों का संचयी आकार निष्पादक पर आवंटित स्मृति से परे बढ़ता है। एक छोटे से उदाहरण के लिए नीचे देखें जो इस व्यवहार को दिखाता है।(पीई) स्पार्क
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (hive_context.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
simple_data.csv:
1,2,3
4,5,6
7,8,9
आवेदन यूआई को देखते हुए, वहाँ नया कॉलम एक के लिए adition में मूल dataframe की एक प्रति है, । मैं कॉलम लाइन से पहले df.unpersist()
पर कॉल करके मूल प्रति को हटा सकता हूं। कैश किए गए इंटरमीडिएट परिणाम को हटाने का यह अनुशंसित तरीका है (यानी प्रत्येक cache()
से पहले unpersist को कॉल करें)।
साथ ही, सभी कैश किए गए ऑब्जेक्ट्स को शुद्ध करना संभव है। मेरे आवेदन में, प्राकृतिक ब्रेकपॉइंट्स हैं जहां मैं बस सभी मेमोरी को शुद्ध कर सकता हूं, और अगली फाइल पर जा सकता हूं। मैं प्रत्येक इनपुट फ़ाइल के लिए एक नया स्पार्क एप्लिकेशन बनाने के बिना ऐसा करना चाहता हूं।
अग्रिम धन्यवाद!
यह अब के लिए एक अच्छा समाधान है के रूप में यह मुझ पर पूर्ण कैश को साफ़ करने के लिए अनुमति देता है उचित ब्रेकपॉइंट्स मैं इसे शामिल करूंगा, लेकिन जब मैं स्केल करता हूं और बड़े डेटासेट के साथ काम करना शुरू करता हूं तो पुराने कैश नियंत्रण से बाहर निकलने लगते हैं। यदि मैं पुराने कैश को साफ़ करना चाहता हूं, तो एक नया वैरिएबल (या अस्थायी चर) बनाने की सिफारिश है, और स्पष्ट रूप से पुराने ऑब्जेक्ट्स को बेकार कर देती है। कुछ ऐसा: 'df.cache()'; 'df_new = df.with कॉलम ('सी 1 + सी 2', डीएफ ['सी 1'] + डीएफ ['सी 2'])'; 'df_new.cache()'; 'Df.unpersist()'। यह थोड़ा बोझिल लगता है अगर यह एकमात्र तरीका है ... – bjack3
आमतौर पर कैश को स्पष्ट रूप से साफ़ करने की आवश्यकता नहीं है। आवश्यकता होने पर इसे स्वचालित रूप से साफ़ किया जाता है। – zero323
मुझे चिंता है कि मैं कुछ गलत कर रहा हूं। मेरे पूर्ण आवेदन में, मेरी नौकरियां अंततः स्मृति त्रुटियों के कारण क्रैश हो जाएंगी।डेटाफ्रेम की प्रत्येक व्यक्तिगत प्रतिलिपि काफी कम है (100 एमबी से कम), लेकिन कैश हमेशा के लिए जीते हैं; एक फ़ाइल में आउटपुट लिखने के बाद भी, और अगले चरणों में आगे बढ़ने के बाद भी। मैं देखूंगा कि क्या मैं इसे क्रिया में दिखाने के लिए एक छोटा कामकाजी उदाहरण उत्पन्न कर सकता हूं। – bjack3