2017-02-15 23 views
9

मैं स्पार्क का उपयोग कर S3 घटनाओं पर एक सरल एसक्यूएल क्वेरी बनाने के लिए कोशिश कर रहा हूँ। मैं निम्नलिखित के रूप में JSON फाइल के ~ 30GB लोड हो रहा हूँ:SQL क्वेरी से अधिक है Integer.MAX_VALUE

val d2 = spark.read.json("s3n://myData/2017/02/01/1234"); 
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK); 
d2.registerTempTable("d2"); 

तब मैं अपने प्रश्न का परिणाम दर्ज करने के लिए लिखने के लिए कोशिश कर रहा हूँ:

val users_count = sql("select count(distinct data.user_id) from d2"); 
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv"); 

लेकिन स्पार्क निम्न अपवाद फेंक है:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) 
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) 
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) 
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) 
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) 
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) 
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
at org.apache.spark.scheduler.Task.run(Task.scala:85) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

ध्यान दें कि एक ही क्वेरी डेटा की कम मात्रा के लिए काम करता है। यहाँ क्या समस्या है?

+0

सबसे संभावित समस्या विभाजन आकार से अधिक सीमा के साथ है, '.repartition (100) 'आदि का प्रयास करें, इसे –

+0

को डेटा को पढ़ने के बाद इसे पुन: प्रारंभ करने का प्रयास करना चाहिए' val d2 = spark.read.json (" s3n: // myData/2017/02/01/1234 ")। पुनर्विभाजन (1000)' संदर्भ https://issues.apache.org/jira/browse/SPARK-1476 –

+0

एक तरफ ध्यान दें के रूप में, आप नए 's3a का उपयोग कर में देखना चाहते हो सकता है 's3n' के बजाय; उदाहरण देखें http://stackoverflow.com/questions/33356041/technically-what-is-the-difference-between-s3n-s3a-and-s3 – sgvd

उत्तर

22

कोई स्पार्क फेरबदल ब्लॉक 2GB से अधिक हो सकता है (Integer.MAX_VALUE बाइट्स) ताकि आप अधिक/छोटे विभाजन की जरूरत है।

आपको spark.default.parallelism और spark.sql.shuffle.partitions (डिफ़ॉल्ट 200) को समायोजित करना चाहिए ताकि विभाजन की संख्या 2 जीबी सीमा तक पहुंचने के बिना आपके डेटा को समायोजित कर सके (आप 256 एमबी/विभाजन के लिए लक्ष्य का प्रयास कर सकते हैं 200 जीबी आपको 800 विभाजन मिलते हैं)। हजारों विभाजन बहुत आम हैं इसलिए सुझाव के अनुसार 1000 को पुन: विभाजन से डरो मत।

FYI करें, आप rdd.getNumPartitions की तरह कुछ के साथ एक RDD के लिए विभाजन की संख्या की जांच कर सकते हैं (यानी d2.rdd.getNumPartitions)

वहाँ विभिन्न 2GB सीमा को संबोधित करने का प्रयास (खुला कर दिया गया ट्रैक करने के लिए एक कहानी है अब थोड़ी देर के) के लिए: https://issues.apache.org/jira/browse/SPARK-6235

इस त्रुटि के बारे में अधिक जानकारी के लिए http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25 देखें।

+0

उत्तर के लिए धन्यवाद! स्पष्टीकरण के लिए – eexxoo

+0

धन्यवाद! डिफ़ॉल्ट विभाजनों की संख्या को संपादित करने के लिए https://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-pa पर भी एक नज़र डालें। – Raphvanns

संबंधित मुद्दे