मैं स्पार्क के साथ एक बड़ी टेक्स्ट फ़ाइल "mydata.txt" (वास्तविक फ़ाइल का आकार लगभग 30GB) संसाधित करना चाहता हूं। यह रिकॉर्ड डेलीमीटर है "\ |" "\ n" के बाद। क्योंकि लोडिंग फ़ाइल के डिफ़ॉल्ट रिकॉर्ड विभाजक ("sc.textFile" द्वारा) "\ n" है, मैंने org.apache.hadoop.conf की "textinputformat.record.delimiter" प्रॉपर्टी सेट की है। "\ | \ N" पर कॉन्फ़िगरेशन अब तकऑपरेटिंग आरडीडी org.apache.hadoop.conf के साथ स्पार्क रिकॉर्ड डिलीमीटर सेट करते समय असफल रहा। कॉन्फ़िगरेशन
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val LINE_DELIMITER = "\\ |\n"
val FIELD_SEP = "_\\|"
val conf = new Configuration
conf.set("textinputformat.record.delimiter", LINE_DELIMITER)
val raw_data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
तो अच्छा: रिकॉर्ड परिसीमक निर्दिष्ट:
AAAAA_|BBBBB_|
CCCCC\
DDDDD
EEEEE_FFFFFFFFFFFF\ |
GGGGG_|HHHHH_|
IIIII\
GGGGG\
KKKKK_|LLLLLLLLLLL\ |
MMMM_|NNNNN_|OOOOO\ |
अगला मैं चिंगारी से खोल में निम्न कोड निष्पादित। हालांकि,
scala> val data = raw_data.filter(x => x.split(FIELD_SEP).size >= 3)
data: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:22
scala> data.collect
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
scala> data.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
...
मैं क्यों RDD "डाटा", में हेरफेर नहीं कर सकते जब sc.textFile("mydata.txt")
का उपयोग करते समय सब कुछ ठीक है? और इसे कैसे ठीक करें?
यह काम करता है। लेकिन क्या "org.apache.hadoop.conf पर बंद हो रहा है। कॉन्फ़िगरेशन" मतलब है? यह सिर्फ एक कॉन्फ़िगरेशन ऑब्जेक्ट है, बंद कहां है? धन्यवाद। – Chad
'x => x.split (FIELD_SEP) .size> = 3' बंद है। –
1 विधि आज़माएं: 'var = new स्पार्ककॉन्फ() conf.set (" spark.kryo.registrator ", classOf [HadoopConfig] .getName); 'द्वारा: java.lang.ClassCastException: org.apache.hadoop.conf .Configuration की स्थापना org.apache.spark.serializer.KryoRegistrator – jiamo