2016-04-28 5 views
8

में सभी डेटाफ्रेम को लगातार बनाए रखना मैं कई बिंदुओं के साथ एक स्पार्क एप्लिकेशन हूं जहां मैं वर्तमान स्थिति को जारी रखना चाहता हूं। यह आमतौर पर एक बड़े कदम के बाद होता है, या एक राज्य को कैशिंग करता है जिसे मैं कई बार उपयोग करना चाहता हूं। ऐसा प्रतीत होता है कि जब मैं दूसरी बार अपने डेटाफ्रेम पर कैश कहता हूं, तो एक नई प्रति स्मृति में कैश की जाती है। मेरे आवेदन में, स्केलिंग करते समय यह स्मृति समस्याओं को जन्म देता है। हालांकि, मेरे वर्तमान परीक्षणों में एक दिया गया डेटाफ्रेम अधिकतम 100 एमबी है, मध्यवर्ती परिणामों का संचयी आकार निष्पादक पर आवंटित स्मृति से परे बढ़ता है। एक छोटे से उदाहरण के लिए नीचे देखें जो इस व्यवहार को दिखाता है।(पीई) स्पार्क

cache_test.py:

from pyspark import SparkContext, HiveContext 

spark_context = SparkContext(appName='cache_test') 
hive_context = HiveContext(spark_context) 

df = (hive_context.read 
     .format('com.databricks.spark.csv') 
     .load('simple_data.csv') 
    ) 
df.cache() 
df.show() 

df = df.withColumn('C1+C2', df['C1'] + df['C2']) 
df.cache() 
df.show() 

spark_context.stop() 

simple_data.csv:

1,2,3 
4,5,6 
7,8,9 

आवेदन यूआई को देखते हुए, वहाँ नया कॉलम एक के लिए adition में मूल dataframe की एक प्रति है, । मैं कॉलम लाइन से पहले df.unpersist() पर कॉल करके मूल प्रति को हटा सकता हूं। कैश किए गए इंटरमीडिएट परिणाम को हटाने का यह अनुशंसित तरीका है (यानी प्रत्येक cache() से पहले unpersist को कॉल करें)।

साथ ही, सभी कैश किए गए ऑब्जेक्ट्स को शुद्ध करना संभव है। मेरे आवेदन में, प्राकृतिक ब्रेकपॉइंट्स हैं जहां मैं बस सभी मेमोरी को शुद्ध कर सकता हूं, और अगली फाइल पर जा सकता हूं। मैं प्रत्येक इनपुट फ़ाइल के लिए एक नया स्पार्क एप्लिकेशन बनाने के बिना ऐसा करना चाहता हूं।

अग्रिम धन्यवाद!

उत्तर

11

स्पार्क 2.x

आप Catalog.clearCache उपयोग कर सकते हैं:

from pyspark.sql import SparkSession 

spark = SparkSession.builder.getOrCreate 
... 
spark.catalog.clearCache() 

स्पार्क 1.x

आप SQLContext.clearCache विधि का उपयोग कर सकते हैं जो

सभी निकालता है इन-मेमोरी कैश से कैश किए गए टेबल।

from pyspark.sql import SQLContext 
from pyspark import SparkContext 

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) 
... 
sqlContext.clearCache() 
+1

यह अब के लिए एक अच्छा समाधान है के रूप में यह मुझ पर पूर्ण कैश को साफ़ करने के लिए अनुमति देता है उचित ब्रेकपॉइंट्स मैं इसे शामिल करूंगा, लेकिन जब मैं स्केल करता हूं और बड़े डेटासेट के साथ काम करना शुरू करता हूं तो पुराने कैश नियंत्रण से बाहर निकलने लगते हैं। यदि मैं पुराने कैश को साफ़ करना चाहता हूं, तो एक नया वैरिएबल (या अस्थायी चर) बनाने की सिफारिश है, और स्पष्ट रूप से पुराने ऑब्जेक्ट्स को बेकार कर देती है। कुछ ऐसा: 'df.cache()'; 'df_new = df.with कॉलम ('सी 1 + सी 2', डीएफ ['सी 1'] + डीएफ ['सी 2'])'; 'df_new.cache()'; 'Df.unpersist()'। यह थोड़ा बोझिल लगता है अगर यह एकमात्र तरीका है ... – bjack3

+0

आमतौर पर कैश को स्पष्ट रूप से साफ़ करने की आवश्यकता नहीं है। आवश्यकता होने पर इसे स्वचालित रूप से साफ़ किया जाता है। – zero323

+0

मुझे चिंता है कि मैं कुछ गलत कर रहा हूं। मेरे पूर्ण आवेदन में, मेरी नौकरियां अंततः स्मृति त्रुटियों के कारण क्रैश हो जाएंगी।डेटाफ्रेम की प्रत्येक व्यक्तिगत प्रतिलिपि काफी कम है (100 एमबी से कम), लेकिन कैश हमेशा के लिए जीते हैं; एक फ़ाइल में आउटपुट लिखने के बाद भी, और अगले चरणों में आगे बढ़ने के बाद भी। मैं देखूंगा कि क्या मैं इसे क्रिया में दिखाने के लिए एक छोटा कामकाजी उदाहरण उत्पन्न कर सकता हूं। – bjack3

1

हम इसका उपयोग अक्सर

for (id, rdd) in sc._jsc.getPersistentRDDs().items(): 
    rdd.unpersist() 
0

कर सकते हैं व्यक्तिगत रूप से unpersist सभी df के:

firstDF.unpersist() 
secondDF.unpersist()