मैं स्पार्क का उपयोग कर S3 घटनाओं पर एक सरल एसक्यूएल क्वेरी बनाने के लिए कोशिश कर रहा हूँ। मैं निम्नलिखित के रूप में JSON फाइल के ~ 30GB लोड हो रहा हूँ:SQL क्वेरी से अधिक है Integer.MAX_VALUE
val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");
तब मैं अपने प्रश्न का परिणाम दर्ज करने के लिए लिखने के लिए कोशिश कर रहा हूँ:
val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");
लेकिन स्पार्क निम्न अपवाद फेंक है:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ध्यान दें कि एक ही क्वेरी डेटा की कम मात्रा के लिए काम करता है। यहाँ क्या समस्या है?
सबसे संभावित समस्या विभाजन आकार से अधिक सीमा के साथ है, '.repartition (100) 'आदि का प्रयास करें, इसे –
को डेटा को पढ़ने के बाद इसे पुन: प्रारंभ करने का प्रयास करना चाहिए' val d2 = spark.read.json (" s3n: // myData/2017/02/01/1234 ")। पुनर्विभाजन (1000)' संदर्भ https://issues.apache.org/jira/browse/SPARK-1476 –
एक तरफ ध्यान दें के रूप में, आप नए 's3a का उपयोग कर में देखना चाहते हो सकता है 's3n' के बजाय; उदाहरण देखें http://stackoverflow.com/questions/33356041/technically-what-is-the-difference-between-s3n-s3a-and-s3 – sgvd