2017-01-25 12 views
7

मैं एक columnm के अनुसार एक DataFrame के पुनर्विभाजन की कोशिश DataFrame है N विभिन्न मूल्यों विभाजन स्तंभ x में जैसे (N=3 मान लीजिए),:गिराने अपाचे स्पार्क में खाली DataFrame विभाजन

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data 

क्या मैं प्राप्त करने के लिए myDFx द्वारा रिक्त विभाजन तैयार किए बिना पुनः प्राप्त करना है। क्या ऐसा करने का कोई बेहतर तरीका है?

val numParts = myDF.select($"x").distinct().count.toInt 
myDF.repartition(numParts,$"x") 

(मैं repartiton में numParts निर्दिष्ट नहीं करते हैं, मेरे विभाजन के सबसे खाली हैं (जैसा कि repartition 200 विभाजन बनाता है) ...)

+1

http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration- विकल्प के अनुसार, 200 विभाजन कॉन्फ़िगरेशन विकल्प 'स्पार्क के लिए डिफ़ॉल्ट मान के कारण बनाए जाएंगे .sql.shuffle.partitions' – AKSW

+1

उत्तर मिल सकता है http://stackoverflow.com/questions/41854818/spark-dataframe-repartition-number-of-partition-not-preserved?noredirect=1#comment70893687_41854818 – FaigB

उत्तर

2

मैं df से अधिक बार-बार दोहराना के साथ समाधान के लगता है कि विभाजन और गैर-खाली विभाजन खोजने के लिए में रिकॉर्ड गिनती प्राप्त करना।

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

हम गैर खाली विभाजन (nonEmptyPart) मिल गया के रूप में, हम coalesce() (check coalesce() vs reparation()) का उपयोग करके खाली विभाजन साफ ​​कर सकते हैं।

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type 

यह कर सकते हैं या सबसे अच्छा नहीं हो सकता है, लेकिन इस समाधान हम उपयोग नहीं कर रहे के रूप में reparation()


उदाहरण टिप्पणी को संबोधित करने के

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x") 
val nonEmptyPart = sc.longAccumulator("nonEmptyPart") 

df1.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

val finalDf = df1.coalesce(nonEmptyPart.value.toInt) 

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}") 
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}") 
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}") 

आउटपुट फेरबदल से बचने जाएगा

nonEmptyPart => 3 
df.rdd.partitions.length => 200 
finalDf.rdd.partitions.length => 3 
+0

'val df = sc.parallelize (सेक (1,1,2,2,3,3))। डीडीएफ ("एक्स")। पुनरावृत्ति (10, $ "एक्स") .coalesce (3) '। अब यह 10 से 3. – mrsrinivas

+0

से विभाजन की संख्या को संकुचित करता है और अब 'finalDf.foreachPartition (p => println (p.size)) '। मुझे '0 0 6' मिलता है, यानी 2 विभाजन खाली हैं, 1 में सभी पंक्तियां हैं। जो कुछ मैं चाहता था वह नहीं था (मैं स्पार्क 1.6.3 का उपयोग कर रहा हूं) –

+0

यह 'coalesce' के साथ अक्षम शफल होने की वजह से हो सकता है। 'रिपर्टिशन' का उपयोग करने का प्रयास करें, यह 'हैशपार्टिशनर' के अनुसार सभी डेटा को घुमाएगा। तो कुछ विभाजन के साथ प्रत्येक विभाजन भरने का मौका मिलेगा। यदि आप खाली विभाजन को हटाने के लिए वास्तव में सख्त हैं तो आपको इसे चलाने की आवश्यकता हो सकती है (** गैर खाली विभाजन ढूंढना और सहवास/पुनरावृत्ति लागू करना **) इसे सक्रिय रूप से। – mrsrinivas

संबंधित मुद्दे