2014-11-28 4 views
8

मैं स्पार्क के साथ एक बड़ी टेक्स्ट फ़ाइल "mydata.txt" (वास्तविक फ़ाइल का आकार लगभग 30GB) संसाधित करना चाहता हूं। यह रिकॉर्ड डेलीमीटर है "\ |" "\ n" के बाद। क्योंकि लोडिंग फ़ाइल के डिफ़ॉल्ट रिकॉर्ड विभाजक ("sc.textFile" द्वारा) "\ n" है, मैंने org.apache.hadoop.conf की "textinputformat.record.delimiter" प्रॉपर्टी सेट की है। "\ | \ N" पर कॉन्फ़िगरेशन अब तकऑपरेटिंग आरडीडी org.apache.hadoop.conf के साथ स्पार्क रिकॉर्ड डिलीमीटर सेट करते समय असफल रहा। कॉन्फ़िगरेशन

import org.apache.hadoop.io.LongWritable 
import org.apache.hadoop.io.Text 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

val LINE_DELIMITER = "\\ |\n" 
val FIELD_SEP = "_\\|" 

val conf = new Configuration 
conf.set("textinputformat.record.delimiter", LINE_DELIMITER) 
val raw_data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) 

तो अच्छा: रिकॉर्ड परिसीमक निर्दिष्ट:

AAAAA_|BBBBB_| 
CCCCC\ 
DDDDD 
EEEEE_FFFFFFFFFFFF\ | 
GGGGG_|HHHHH_| 
IIIII\ 
GGGGG\ 
KKKKK_|LLLLLLLLLLL\ | 
MMMM_|NNNNN_|OOOOO\ | 

अगला मैं चिंगारी से खोल में निम्न कोड निष्पादित। हालांकि,

scala> val data = raw_data.filter(x => x.split(FIELD_SEP).size >= 3) 
data: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:22 

scala> data.collect 
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

scala> data.foreach(println) 
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration 
    ... 

मैं क्यों RDD "डाटा", में हेरफेर नहीं कर सकते जब sc.textFile("mydata.txt") का उपयोग करते समय सब कुछ ठीक है? और इसे कैसे ठीक करें?

उत्तर

14

आप क्योंकि आप org.apache.hadoop.conf.Configuration से अधिक बंद कर रहे हैं यह अपवाद हो रही है, लेकिन यह नहीं serializable

Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 

है आप दो कर सकते हैं: एक काइरो serializer साथ 1. रजिस्टर विन्यास या 2. बस अपने conf निशान transient के रूप में परिवर्तनीय जो मूल रूप से स्पार्क को बंद करने के साथ नहीं भेजता है।

scala> @transient val conf = new Configuration 
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml 

scala> val raw_data = sc.newAPIHadoopFile("../test.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) 
14/11/28 00:54:03 INFO MemoryStore: ensureFreeSpace(32937) called with curMem=70594, maxMem=278302556 
14/11/28 00:54:03 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 32.2 KB, free 265.3 MB) 
raw_data: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at map at <console>:18 

scala> val data = raw_data.filter{x => x.split(FIELD_SEP).size >= 3} 
data: org.apache.spark.rdd.RDD[String] = FilteredRDD[6] at filter at <console>:22 

scala> data.count 
14/11/28 00:54:16 INFO FileInputFormat: Total input paths to process : 1 
14/11/28 00:54:16 INFO SparkContext: Starting job: count at <console>:25 
14/11/28 00:54:16 INFO DAGScheduler: Got job 2 (count at <console>:25) with 1 output partitions (allowLocal=false) 
14/11/28 00:54:16 INFO DAGScheduler: Final stage: Stage 2(count at <console>:25) 
14/11/28 00:54:16 INFO DAGScheduler: Parents of final stage: List() 
14/11/28 00:54:16 INFO DAGScheduler: Missing parents: List() 
14/11/28 00:54:16 INFO DAGScheduler: Submitting Stage 2 (FilteredRDD[6] at filter at <console>:22), which has no missing parents 
14/11/28 00:54:16 INFO MemoryStore: ensureFreeSpace(4488) called with curMem=103531, maxMem=278302556 
14/11/28 00:54:16 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 4.4 KB, free 265.3 MB) 
14/11/28 00:54:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (FilteredRDD[6] at filter at <console>:22) 
14/11/28 00:54:16 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 
14/11/28 00:54:16 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1223 bytes) 
14/11/28 00:54:16 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 
14/11/28 00:54:16 INFO NewHadoopRDD: Input split: file:/Users/ssimanta/spark/test.txt:0+123 
14/11/28 00:54:16 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1731 bytes result sent to driver 
14/11/28 00:54:16 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 19 ms on localhost (1/1) 
14/11/28 00:54:16 INFO DAGScheduler: Stage 2 (count at <console>:25) finished in 0.019 s 
14/11/28 00:54:16 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
14/11/28 00:54:16 INFO DAGScheduler: Job 2 finished: count at <console>:25, took 0.025300 s 
res5: Long = 1 

scala> data.collect 
14/11/28 00:55:16 INFO SparkContext: Starting job: collect at <console>:25 
14/11/28 00:55:16 INFO DAGScheduler: Got job 3 (collect at <console>:25) with 1 output partitions (allowLocal=false) 
14/11/28 00:55:16 INFO DAGScheduler: Final stage: Stage 3(collect at <console>:25) 
14/11/28 00:55:16 INFO DAGScheduler: Parents of final stage: List() 
14/11/28 00:55:16 INFO DAGScheduler: Missing parents: List() 
14/11/28 00:55:16 INFO DAGScheduler: Submitting Stage 3 (FilteredRDD[6] at filter at <console>:22), which has no missing parents 
14/11/28 00:55:16 INFO MemoryStore: ensureFreeSpace(4504) called with curMem=108019, maxMem=278302556 
14/11/28 00:55:16 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 4.4 KB, free 265.3 MB) 
14/11/28 00:55:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 (FilteredRDD[6] at filter at <console>:22) 
14/11/28 00:55:16 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 
14/11/28 00:55:16 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1223 bytes) 
14/11/28 00:55:16 INFO Executor: Running task 0.0 in stage 3.0 (TID 3) 
14/11/28 00:55:16 INFO NewHadoopRDD: Input split: file:/Users/ssimanta/spark/test.txt:0+123 
14/11/28 00:55:16 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1717 bytes result sent to driver 
14/11/28 00:55:16 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 16 ms on localhost (1/1) 
14/11/28 00:55:16 INFO DAGScheduler: Stage 3 (collect at <console>:25) finished in 0.017 s 
14/11/28 00:55:16 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
14/11/28 00:55:16 INFO DAGScheduler: Job 3 finished: collect at <console>:25, took 0.021439 s 
res6: Array[String] = Array(MMMM_|NNNNN_|OOOOO\ |) 
+0

यह काम करता है। लेकिन क्या "org.apache.hadoop.conf पर बंद हो रहा है। कॉन्फ़िगरेशन" मतलब है? यह सिर्फ एक कॉन्फ़िगरेशन ऑब्जेक्ट है, बंद कहां है? धन्यवाद। – Chad

+1

'x => x.split (FIELD_SEP) .size> = 3' बंद है। –

+0

1 विधि आज़माएं: 'var = new स्पार्ककॉन्फ() conf.set (" spark.kryo.registrator ", classOf [HadoopConfig] .getName); 'द्वारा: java.lang.ClassCastException: org.apache.hadoop.conf .Configuration की स्थापना org.apache.spark.serializer.KryoRegistrator – jiamo

संबंधित मुद्दे