2016-07-07 13 views
10

में विभाजन आकार को कैसे बदलें मुझे स्पार्क-एसक्यूएल HiveContext का उपयोग करके एक हाइव टेबल से डेटा लोड करने की आवश्यकता है और एचडीएफएस में लोड करें। डिफ़ॉल्ट रूप से, SQL आउटपुट से DataFrame में 2 विभाजन होते हैं। अधिक समांतरता प्राप्त करने के लिए मुझे SQL से अधिक विभाजन की आवश्यकता है। विभाजन पैरामीटर की संख्या लेने के लिए HiveContext में कोई ओवरलोडेड विधि नहीं है।स्पार्क एसक्यूएल

आरडीडी की पुनरावृत्ति के कारण शफल हो रहा है और परिणामस्वरूप अधिक प्रसंस्करण समय होता है।

val result = sqlContext.sql("select * from bt_st_ent") 

का लॉग आउटपुट है: मैं जानना चाहूंगा

Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes) 
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes) 

वहाँ किसी भी तरह से एसक्यूएल उत्पादन का विभाजन आकार बढ़ाने के लिए है।

उत्तर

-1

एक बहुत ही आम और दर्दनाक समस्या है। आपको एक कुंजी की तलाश करनी चाहिए जो समान विभाजन में डेटा वितरित करे। विभाजन में समूह पंक्तियों के लिए स्पार्क को बताने के लिए आप DISTRIBUTE BY और CLUSTER BY ऑपरेटरों का उपयोग कर सकते हैं। यह क्वेरी पर कुछ ओवरहेड होगा। लेकिन परिणामस्वरूप समान रूप से आकार का विभाजन होगा। Deepsense इस पर एक बहुत अच्छा ट्यूटोरियल है।

-1

अपने एसक्यूएल एक फेरबदल करता है (उदाहरण के लिए यह एक में शामिल होने गया है, या द्वारा समूह में किसी प्रकार का), तो आपको 'spark.sql.shuffle.partitions' संपत्ति

sqlContext.setConf("spark.sql.shuffle.partitions", 64) 
की स्थापना द्वारा विभाजन की संख्या निर्धारित कर सकते हैं

फ़ोको द्वारा सुझाए गए कार्यों के बाद, आप क्लस्टर द्वारा एक यादृच्छिक चर का उपयोग कर सकते हैं।

val result = sqlContext.sql(""" 
    select * from (
    select *,random(64) as rand_part from bt_st_ent 
    ) cluster by rand_part""") 
3

स्पार्क < 2.0:

आप उपयोग कर सकते हैं Hadoop विन्यास विकल्प:

  • mapred.min.split.size
  • mapred.max.split.size

के साथ-साथ HDFS ब्लॉक आकार फाइल सिस्टम आधारित प्रारूपों के लिए विभाजन आकार को नियंत्रित करने के लिए।

val minSplit: Int = ??? 
val maxSplit: Int = ??? 

sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit) 
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit) 

स्पार्क 2.0+:

आप spark.sql.files.maxPartitionBytes विन्यास का उपयोग कर सकते हैं:

spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit) 

दोनों ही मामलों में इन मूल्यों को उपयोग में एक विशिष्ट डेटा स्रोत एपीआई द्वारा नहीं किया जा सकता, ताकि आप हमेशा करना चाहिए आपके द्वारा उपयोग किए जाने वाले प्रारूप के दस्तावेज़ीकरण/कार्यान्वयन विवरण देखें।

+0

यह डेटासेट का उपयोग कर स्पार्क 2.1.1 के लिए हमारे क्लस्टर में काम नहीं किया – Luckylukee

संबंधित मुद्दे