2016-01-04 5 views
6

समस्या विभाजन/नोड्स की संख्या कैसे निर्धारित करें: मैं का उपयोग कर S3 से स्पार्क ईएमआर में डेटा आयात करना चाहते हैं:जब स्पार्क में डेटा आयात करने

data = sqlContext.read.json("s3n://.....") 

वहाँ एक रास्ता मैं की संख्या निर्धारित कर सकते हैं नोड्स जो स्पार्क लोड और प्रक्रिया डेटा का उपयोग करता है?

data.registerTempTable("table") 
SqlData = sqlContext.sql("SELECT * FROM table") 

प्रसंग:: यह मैं डेटा को प्रोसेस का एक उदाहरण है डेटा बहुत बड़ा नहीं है, स्पार्क में लोड करने के लिए एक लंबा समय लगता है और यह भी से क्वेरी करने के लिए। मुझे लगता है कि स्पार्क डेटा को बहुत सारे नोड्स में विभाजित करता है। मैं इसे मैन्युअल रूप से सेट करने में सक्षम होना चाहता हूं। मुझे पता है कि आरडीडी और sc.parallelize से निपटने के दौरान मैं विभाजन के रूप में विभाजन की संख्या पास कर सकता हूं। इसके अलावा, मैंने repartition() देखा है, लेकिन मुझे यकीन नहीं है कि यह मेरी समस्या का समाधान कर सकता है या नहीं। परिवर्तनीय data मेरे उदाहरण में DataFrame है।

मुझे विभाजन को और अधिक सटीक परिभाषित करने दें। परिभाषा एक: आमतौर पर "विभाजन कुंजी" के रूप में जाना जाता है, जहां एक स्तंभ चुना जाता है और क्वेरी को तेज़ करने के लिए अनुक्रमित किया जाता है (जो मैं चाहता हूं)। परिभाषा दो: (यह वह जगह है जहां मेरी चिंता है) मान लीजिए कि आपके पास डेटा सेट है, स्पार्क निर्णय लेता है कि यह इसे कई नोड्स में वितरित करने जा रहा है ताकि यह समानांतर में डेटा पर संचालन चला सके। यदि डेटा का आकार बहुत छोटा है, तो यह प्रक्रिया को और धीमा कर सकता है। मैं उस मान को कैसे सेट कर सकता हूं

+0

संभावित स्पाइप डेटाफ्रेम के विभाजन को परिभाषित करने के लिए संभावित डुप्लिकेट कैसे करें?] (Http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe) –

+0

मुझे परिभाषित करने दें अधिक सटीक विभाजन। परिभाषा एक: आमतौर पर "विभाजन कुंजी" के रूप में जाना जाता है, जहां एक स्तंभ चुना जाता है और क्वेरी को तेज करने के लिए अनुक्रमित किया जाता है। परिभाषा दो: (यह वह जगह है जहां मेरी चिंता है) मान लीजिए कि आपके पास डेटा सेट है, स्पार्क निर्णय लेता है कि यह इसे कई नोड्स में वितरित करने जा रहा है ताकि यह समानांतर में डेटा पर संचालन चला सके। यदि डेटा का आकार बहुत छोटा है, तो यह प्रक्रिया को और धीमा कर सकता है। मैं उस मूल्य को कैसे सेट कर सकता हूं? – pemfir

+0

तालिका विभाजन के बीच आप स्पष्ट डेटाबेस बनाम आरडीडी विभाजन के रूप में स्पष्ट अंतर बनाने में सही हैं। देखें [स्पार्क डेटाफ्रेम के विभाजन को परिभाषित करने के लिए कैसे?] (http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe), जो वर्णन करता है कि डेटाफ्रेम विभाजन कैसे करें, जैसे कि _RDD/वितरित विभाजन_ में। –

उत्तर

6

डिफ़ॉल्ट रूप से यह 200 सेट में विभाजन करता है। आप एसक्यूएल संदर्भ sqlContext.sql("set spark.sql.shuffle.partitions=10"); में सेट कमांड का उपयोग करके इसे बदल सकते हैं। हालांकि आपको इसे अपनी डेटा विशेषताओं के आधार पर सावधानी बरतने की आवश्यकता है।

4

आप विभाजन सेट करने के लिए डेटाफ्रेम पर पुनरावृत्ति() को कॉल कर सकते हैं। तुम भी छत्ता संदर्भ बनाने के बाद या चिंगारी से सबमिट करने के जार

चिंगारी से जमा कर सकते .... --conf spark.sql.shuffle.partitions = पारित करके spark.sql.shuffle.partitions इस संपत्ति सेट कर सकते हैं 100 या dataframe.repartition (100)

0

"इनपुट" विभाजन की संख्या फाइल सिस्टम विन्यास द्वारा निर्धारित हैं।

128 जी के ब्लॉक आकार के साथ 1Go की 1 फ़ाइल आपको 10 कार्य देगा। मुझे यकीन नहीं है कि आप इसे बदल सकते हैं।

पुनर्वितरण बहुत खराब हो सकता है, यदि आपके पास बहुत सारे इनपुट विभाजन हैं तो यह विभाजन के बीच बहुत शफल (डेटा यातायात) बना देगा।

कोई जादू विधि नहीं है, आपको यह देखने के लिए वेबूआई का उपयोग करना है, और कितने कार्य उत्पन्न होते हैं।

संबंधित मुद्दे