6

मेरे पास JSON डेटा है जो मैं कई फ़ील्ड के साथ डेटा फ्रेम में पढ़ रहा हूं, इसे दो स्तंभों के आधार पर दोबारा विभाजित कर रहा हूं, और पांडों में परिवर्तित हो रहा हूं।Pyspark सरल पुन: विभाजन और toPandas() केवल 600,000+ पंक्तियों को खत्म करने में विफल रहता है

यह नौकरी कुछ अस्पष्ट त्रुटियों के साथ डेटा की 600,000 पंक्तियों पर ईएमआर पर विफल रहता है। मैंने स्पार्क ड्राइवर की मेमोरी सेटिंग्स भी बढ़ा दी है, और अभी भी कोई रिज़ॉल्यूशन नहीं दिख रहा है।

यहाँ मेरी pyspark कोड है:

conf = SparkConf().setAppName('myapp1') 
conf.set('spark.yarn.executor.memoryOverhead', 8192) 
conf.set('spark.executor.memory', 8192) 
conf.set('spark.driver.memory', 8192) 
sc = SparkContext(conf=conf) 

त्रुटियों मैं कर रहे हैं::

16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down. 
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties]. 
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties]. 
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143 
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties]. 
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties]. 
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143 
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down. 
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 

कोड ठीक पर काम करता है

enhDataDf = (
    sqlContext 
    .read.json(sys.argv[1]) 
    ) 

enhDataDf = (
    enhDataDf 
    .repartition('column1', 'column2') 
    .toPandas() 
    ) 
enhDataDf = sqlContext.createDataFrame(enhDataDf) 
enhDataDf = (
    enhDataDf 
    .toJSON() 
    .saveAsTextFile(sys.argv[2]) 
    ) 

मेरे चिंगारी सेटिंग्स इस प्रकार हैं लगभग 600,000 JSON लाइनों तक - भले ही स्मृति उपलब्ध है। फिर, यह असफल रहता है।

क्या हो रहा है और इस समस्या को कैसे डिबग/ठीक करने के बारे में कोई विचार है?

उत्तर

3

मुझे विश्वास है कि समस्या अपने कोड की निम्न भाग से आता है::

enhDataDf = (
    enhDataDf 
    .repartition('column1', 'column2') 
    .toPandas() 
) 

.toPandas() एकत्र उदाहरण के लिए यदि आप 8 का उपयोग कर रहे हैं, तो निष्पादकों और/या ड्राइवर में 4 का उपयोग डेटा, इसलिए जब रिकॉर्ड्स की संख्या बढ़ती है, तो इसका परिणाम ड्राइवर विफलता में होगा।

आपकी टिप्पणी के अनुसार यह आपके द्वारा उपयोग की जाने वाली सटीक पाइपलाइन है। इसका मतलब है कि एक पूरा चरण न केवल अप्रचलित बल्कि गलत भी है।डेटा इकट्ठा किया जाता है और आगे parallelized जब वहाँ है कि विभाजन

.repartition('column1', 'column2') 

द्वारा बनाई गारंटी संरक्षित किया जाएगा स्पार्क DataFrame जब आप फिर से बनाने:

sqlContext.createDataFrame(enhDataDf) 

आप स्तंभ आप कर सकते हैं के द्वारा डेटा लिखने के लिए चाहते हैं यह सीधे:

(sqlContext 
    .read.json(sys.argv[1]) 
    .repartition('column1', 'column2') 
    .write 
    .json(sys.argv[2])) 

RDD के लिए मध्यवर्ती toPandas और रूपांतरण लंघन।

अपनी टिप्पणी के बाद:

तो toPandas तो एक उद्देश्य में कार्य करता यह हमेशा पाइप लाइन में एक सीमित कारक बना रहेगा और केवल प्रत्यक्ष समाधान पैमाने-अप करने के लिए ड्राइवर नोड है।

  • एल्गोरिदम आप स्पार्क के शीर्ष पर उपयोग यह पहले से ही उपलब्ध नहीं हैं Reimplementing: सटीक एल्गोरिदम आप एकत्र किए गए आंकड़ों पर लागू होते हैं पर निर्भर करता है आप वैकल्पिक विकल्पों पर विचार कर सकते हैं।
  • एक बेहतर SciPy स्टैक इंटरऑपरेबिलिटी (जैसे Dask) के साथ वैकल्पिक ढांचे पर विचार करें।
1

यह मैं मेरा Spark – Container exited with a non-zero exit code 143, जहां मैं क्लस्टर मोड है, जो अपने आवेदन के साथ एक स्मृति मुद्दा इंगित करता है में एक PySpark काम lanching गया था की याद दिलाती है।

सबसे पहले यह निर्धारित करने का प्रयास करें कि कौन सी मशीनें असफल हो रही हैं, ड्राइवर या निष्पादक (ओं), और इस प्रकार मैं आपकी चाल को बेहतर तरीके से लक्षित करने में सक्षम हूं - जो मैंने पढ़ा है, वह निष्पादक होना चाहिए।


मैं आप पहले से ही memoryOverhead विन्यास, अच्छा निर्धारित किया है देख सकते हैं। अब चलो memory विन्यास पर ध्यान केंद्रित करते:

... स्पार्क (PySPark) के साथ अजगर चल रहा है, इसलिए सभी मेरा कोड ढेर बंद चलाता है। इसी कारण से, मुझे "ज्यादा" स्मृति आवंटित नहीं करना है (क्योंकि यह स्मृति को काट देगा, मुझे कुल स्मृति से उपयोग करने की अनुमति है, यानी अगर कुल स्मृति का उपयोग करने की अनुमति है तो 20 जी है और मैं 12 जी का अनुरोध कर रहा हूं, फिर । 8G उपयोग करने के लिए मेरी अजगर आवेदन के लिए छोड़ दिया जाएगा

तो कमी कि श्रेय देने के लिए प्रयास करते हैं, हाँ यह कमी


अगला लक्ष्य:! #cores

घटाएं वह भी,

spark.executor.cores      4 
spark.driver.cores       4 
+0

यह अभी भी मदद नहीं करता है। एक ही त्रुटि संदेशों के साथ विफलताओं को जारी रखें। मैं सचमुच 32 जीबी मेमोरी और ऊपर प्रति सेटिंग्स के साथ एम 4.2xlarge उदाहरणों पर चल रहा हूँ। बहुत परेशान है कि यह सिर्फ इन गुप्त त्रुटियों को देता है और अंधेरे गुना में विफल रहता है। – Gopala

+0

हम्म मैं आपसे @ गोपाला से भी ऊपर नहीं हूं, इसका मतलब है कि मेरा जवाब खराब है, क्या मुझे इसे हटाना चाहिए? – gsamaras

+3

मुझे नहीं लगता कि उत्तर खराब है। इसमें कुछ अंतर्दृष्टि और उपयोगी लिंक हैं। सिर्फ यह कि मेरी समस्या का समाधान नहीं हुआ और मैं अभी भी यह देखने का इंतजार कर रहा हूं कि आगे की मदद है या नहीं। – Gopala

संबंधित मुद्दे