लोग,स्पार्क: जब प्रति पंक्ति एक एकल कुंजी पहचानकर्ता पर चिंगारी में 2 बड़े DFS
मैं इस मुद्दे में चल रहा है, जबकि मैं 2 बड़े dataframes शामिल होने के लिए कोशिश कर रहा हूँ (100GB + प्रत्येक) में शामिल होने से आकार Integer.MAX_VALUE से अधिक है ।
val df1 = sqlContext.read.json("hdfs:///df1/")
val df2 = sqlContext.read.json("hdfs:///df2/")
// clean up and filter steps later
df1.registerTempTable("df1")
df2.registerTempTable("df2")
val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4")
df3.write.json("hdfs:///df3/")
यह मूलतः मैं क्या कर रहा हूँ का सार, के बीच DF1 शामिल होने के लिए अन्य सफाई और फ़िल्टर चरणों में से एक है:
मैं ईएमआर पर स्पार्क 1.6 उपयोग कर रहा हूँ और यहाँ मैं क्या कर रहा हूँ है और अंततः डीएफ 2। मैं देख रहा हूँ
त्रुटि है:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
विन्यास और संदर्भ:
मैं 13 नोड 60GB निष्पादकों और ड्राइवर स्मृति ओवरहेड्स के साथ अनुसार सेट के साथ प्रत्येक क्लस्टर उपयोग कर रहा हूँ। बातें जो मैंने कोशिश की है समायोजन:
spark.sql.broadcastTimeout
spark.sql.shuffle.partitions
मैं भी बड़ा क्लस्टर उपयोग करने की कोशिश की है, लेकिन मदद नहीं की। This link कहता है कि अगर शफल विभाजन का आकार 2 जीबी से अधिक है, तो यह त्रुटि फेंक दी जाती है। लेकिन मैंने विभाजन की संख्या को बहुत अधिक मूल्य में बढ़ाने की कोशिश की है, अभी भी कोई भाग्य नहीं है।
मैं आलसी लोड हो रहा है से संबंधित यह कुछ हो सकता है संदेह है। जब मैं डीएफ पर 10 ऑपरेशन करता हूं, तो उन्हें केवल अंतिम चरण में ही निष्पादित किया जाता है। मैंने डीएफ के लिए विभिन्न भंडारण स्तरों पर .persist()
जोड़ने का प्रयास किया, फिर भी यह सफल नहीं होता है। मैंने टेम्पलेट टेबल छोड़ने का भी प्रयास किया है, साफ करने के लिए सभी पहले डीएफ खाली कर दिया है।
हालांकि कोड काम करता है अगर मैं इसे तोड़ने के 2 भागों में नीचे -, डिस्क के लिए अंतिम अस्थायी डेटा (2 डेटा फ्रेम) लिख बाहर निकलने। केवल दो डीएफ में शामिल होने के लिए पुनरारंभ करना।
कि मैंने पहले इस त्रुटि हो रही थी:
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
लेकिन जब मैं spark.sql.broadcastTimeout
समायोजित, मैं पहली बार त्रुटि मिल रही शुरू कर दिया।
इस मामले में किसी भी मदद की सराहना करते हैं। यदि आवश्यक हो तो मैं अधिक जानकारी जोड़ सकता हूं।
जिस संपत्ति को आप समायोजित करने की कोशिश करनी चाहिए वह है 'spark.sql.shuffle.partitions' ('s' atfer विभाजन को नोट करें) –
सही है । टाइपो को सही किया। धन्यवाद – rohitkulky
डेटा skew के कारण हो सकता है। यह पता लगाने का प्रयास करें कि क्या आपके पास एक भी जॉइन कुंजी है जो अन्य सभी की तुलना में बहुत बड़ी है। – LiMuBei