2016-11-25 7 views
5

लोग,स्पार्क: जब प्रति पंक्ति एक एकल कुंजी पहचानकर्ता पर चिंगारी में 2 बड़े DFS

मैं इस मुद्दे में चल रहा है, जबकि मैं 2 बड़े dataframes शामिल होने के लिए कोशिश कर रहा हूँ (100GB + प्रत्येक) में शामिल होने से आकार Integer.MAX_VALUE से अधिक है ।

val df1 = sqlContext.read.json("hdfs:///df1/") 
val df2 = sqlContext.read.json("hdfs:///df2/") 

// clean up and filter steps later 

df1.registerTempTable("df1") 
df2.registerTempTable("df2") 

val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4") 

df3.write.json("hdfs:///df3/") 

यह मूलतः मैं क्या कर रहा हूँ का सार, के बीच DF1 शामिल होने के लिए अन्य सफाई और फ़िल्टर चरणों में से एक है:

मैं ईएमआर पर स्पार्क 1.6 उपयोग कर रहा हूँ और यहाँ मैं क्या कर रहा हूँ है और अंततः डीएफ 2। मैं देख रहा हूँ

त्रुटि है:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) 
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503) 
    at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) 
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

विन्यास और संदर्भ:
मैं 13 नोड 60GB निष्पादकों और ड्राइवर स्मृति ओवरहेड्स के साथ अनुसार सेट के साथ प्रत्येक क्लस्टर उपयोग कर रहा हूँ। बातें जो मैंने कोशिश की है समायोजन:

  • spark.sql.broadcastTimeout
  • spark.sql.shuffle.partitions

मैं भी बड़ा क्लस्टर उपयोग करने की कोशिश की है, लेकिन मदद नहीं की। This link कहता है कि अगर शफल विभाजन का आकार 2 जीबी से अधिक है, तो यह त्रुटि फेंक दी जाती है। लेकिन मैंने विभाजन की संख्या को बहुत अधिक मूल्य में बढ़ाने की कोशिश की है, अभी भी कोई भाग्य नहीं है।

मैं आलसी लोड हो रहा है से संबंधित यह कुछ हो सकता है संदेह है। जब मैं डीएफ पर 10 ऑपरेशन करता हूं, तो उन्हें केवल अंतिम चरण में ही निष्पादित किया जाता है। मैंने डीएफ के लिए विभिन्न भंडारण स्तरों पर .persist() जोड़ने का प्रयास किया, फिर भी यह सफल नहीं होता है। मैंने टेम्पलेट टेबल छोड़ने का भी प्रयास किया है, साफ करने के लिए सभी पहले डीएफ खाली कर दिया है।

हालांकि कोड काम करता है अगर मैं इसे तोड़ने के 2 भागों में नीचे -, डिस्क के लिए अंतिम अस्थायी डेटा (2 डेटा फ्रेम) लिख बाहर निकलने। केवल दो डीएफ में शामिल होने के लिए पुनरारंभ करना।

कि मैंने पहले इस त्रुटि हो रही थी:

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:107) 
    at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

लेकिन जब मैं spark.sql.broadcastTimeout समायोजित, मैं पहली बार त्रुटि मिल रही शुरू कर दिया।

इस मामले में किसी भी मदद की सराहना करते हैं। यदि आवश्यक हो तो मैं अधिक जानकारी जोड़ सकता हूं।

+1

जिस संपत्ति को आप समायोजित करने की कोशिश करनी चाहिए वह है 'spark.sql.shuffle.partitions' ('s' atfer विभाजन को नोट करें) –

+0

सही है । टाइपो को सही किया। धन्यवाद – rohitkulky

+0

डेटा skew के कारण हो सकता है। यह पता लगाने का प्रयास करें कि क्या आपके पास एक भी जॉइन कुंजी है जो अन्य सभी की तुलना में बहुत बड़ी है। – LiMuBei

उत्तर

4

चिंगारी में यू फेरबदल ब्लॉक 2GB से अधिक नहीं हो सकता। ऐसा इसलिए, क्योंकि स्पार्क भंडार ByteBuffer के रूप में ब्लॉक शफ़ल। यहाँ कैसे आप इसे आवंटित है:

ByteBuffer.allocate(int capacity)

के रूप में, ByteBuffer के Integer.MAX_SIZE (2GB) द्वारा सीमित हैं, तो फेरबदल ब्लॉक हैं !! समाधान RDD के इस तरह के लिए या तो SparkSQL में spark.sql.shuffle.partitions का उपयोग करके या rdd.partition() or rdd.colease() द्वारा विभाजन की संख्या में वृद्धि करने के लिए कि प्रत्येक विभाजन आकार < = 2GB है।

आपने कहा है कि आप विभाजन की संख्या को बढ़ाने के लिए करने की कोशिश की है और अभी भी यह विफल रहा है। क्या आप जांच सकते हैं कि विभाजन का आकार> 2 जीबी था या नहीं।बस सुनिश्चित करें कि प्रत्येक ब्लॉक आकार < 2 जीबी

संबंधित मुद्दे