2014-11-16 10 views
9

मैं स्पार्क के लिए नया हूं। और मेरे पास प्रशिक्षण डेटा 4000x1800 के साथ इनपुट फ़ाइल है। जब मैं इस डेटा (लिखित अजगर) को प्रशिक्षित करने की कोशिश निम्न त्रुटि मिलती है:अपाचे स्पार्क: बड़े डेटासेट के लिए pyspark क्रैश

  1. 14/11/15 22:39:13 त्रुटि PythonRDD: अजगर कार्यकर्ता अप्रत्याशित रूप से बाहर (दुर्घटनाग्रस्त हो गया) java.net.SocketException: कनेक्शन सहकर्मी द्वारा रीसेट करें: सॉकेट लिखने की त्रुटि

  2. org.apache.spark.SparkException: चरण विफलता के कारण नौकरी निरस्त: चरण 0.0 में कार्य 0 1 बार विफल रहा, हाल ही में विफलता: चरण 0.0 में खोया कार्य 0.0 (टीआईडी ​​0, स्थानीय होस्ट): java.net.SocketException: सहकर्मी द्वारा कनेक्शन रीसेट: सॉकेट लिखने की त्रुटि

स्पार्क 1.1.0 के साथ काम करना। कोई भी सुझाव बहुत मददगार होगा।

कोड:

from pyspark.mllib.classification import SVMWithSGD 
    from pyspark.mllib.regression import LabeledPoint 
    from pyspark.mllib.linalg import Vectors 
    from pyspark import SparkContext 
    from pyspark import SparkConf, SparkContext 
    from numpy import array 


    #Train the model using feature matrix 
    # Load and parse the data 
    def parsePoint(line): 
     values = [float(x) for x in line.split(' ')] 
     return LabeledPoint(values[0], values[1:]) 

    #create spark Context 
    conf = (SparkConf() 
     .setMaster("local") 
     .setAppName("My app") 
     .set("spark.executor.memory", "1g")) 
    sc = SparkContext(conf = conf) 

    data = sc.textFile("myfile.txt") 
    parsedData = data.map(parsePoint) 

    #Train SVM model 
    model = SVMWithSGD.train(parsedData,100) 

मैं निम्नलिखित त्रुटि मिलती है:

14/11/15 22:38:38 INFO MemoryStore: ensureFreeSpace(32768) called with curMem=0, maxMem=278302556 
14/11/15 22:38:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB) 
>>> parsedData = data.map(parsePoint) 
>>> model = SVMWithSGD.train(parsedData,100) 
14/11/15 22:39:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
14/11/15 22:39:12 WARN LoadSnappy: Snappy native library not loaded 
14/11/15 22:39:12 INFO FileInputFormat: Total input paths to process : 1 
14/11/15 22:39:13 INFO SparkContext: Starting job: runJob at PythonRDD.scala:296 
14/11/15 22:39:13 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:296) with 1 output partitions (allowLocal=true) 
14/11/15 22:39:13 INFO DAGScheduler: Final stage: Stage 0(runJob at PythonRDD.scala:296) 
14/11/15 22:39:13 INFO DAGScheduler: Parents of final stage: List() 
14/11/15 22:39:13 INFO DAGScheduler: Missing parents: List() 
14/11/15 22:39:13 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents 
14/11/15 22:39:13 INFO MemoryStore: ensureFreeSpace(5088) called with curMem=32768, maxMem=278302556 
14/11/15 22:39:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.0 KB, free 265.4 MB) 
14/11/15 22:39:13 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (PythonRDD[3] at RDD at PythonRDD.scala:43) 
14/11/15 22:39:13 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
14/11/15 22:39:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1221 bytes) 
14/11/15 22:39:13 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 
14/11/15 22:39:13 INFO HadoopRDD: Input split: file:/G:/SparkTest/spark-1.1.0/spark-1.1.0/bin/FeatureMatrix.txt:0+8103732 
14/11/15 22:39:13 INFO PythonRDD: Times: total = 264, boot = 233, init = 29, finish = 2 
14/11/15 22:39:13 ERROR PythonRDD: Python worker exited unexpectedly (crashed) 
java.net.SocketException: Connection reset by peer: socket write error 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
     at java.io.DataOutputStream.write(DataOutputStream.java:107) 
     at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
     at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533) 
     at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341) 
     at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) 
     at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 
14/11/15 22:39:13 ERROR PythonRDD: This may have been caused by a prior exception: 
java.net.SocketException: Connection reset by peer: socket write error 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
     at java.io.DataOutputStream.write(DataOutputStream.java:107) 
     at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
     at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533) 
     at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341) 
     at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) 
     at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 
14/11/15 22:39:13 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.net.SocketException: Connection reset by peer: socket write error 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
     at java.io.DataOutputStream.write(DataOutputStream.java:107) 
     at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
     at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533) 
     at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341) 
     at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) 
     at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 
14/11/15 22:39:13 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error 
     java.net.SocketOutputStream.socketWrite0(Native Method) 
     java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 
     java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
     java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
     java.io.DataOutputStream.write(DataOutputStream.java:107) 
     java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
     org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533) 
     org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341) 
     org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340) 
     scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) 
     org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) 
     org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) 
     org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 
14/11/15 22:39:13 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
14/11/15 22:39:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/11/15 22:39:13 INFO TaskSchedulerImpl: Cancelling stage 0 
14/11/15 22:39:13 INFO DAGScheduler: Failed to run runJob at PythonRDD.scala:296 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\classification.py", line 178, in train 
    return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights) 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\_common.py", line 430, in _regression_train_wrapper 
    initial_weights = _get_initial_weights(initial_weights, data) 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\_common.py", line 415, in _get_initial_weights 
    initial_weights = _convert_vector(data.first().features) 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\rdd.py", line 1167, in first 
    return self.take(1)[0] 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\rdd.py", line 1153, in take 
    res = self.context.runJob(self, takeUpToNumLeft, p, True) 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\context.py", line 770, in runJob 
    it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__ 
    File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, lo 
host): java.net.SocketException: Connection reset by peer: socket write error 
     java.net.SocketOutputStream.socketWrite0(Native Method) 
     java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 
     java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
     java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
     java.io.DataOutputStream.write(DataOutputStream.java:107) 
     java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
     org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533) 
     org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341) 
     org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340) 
     scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) 
     org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) 
     org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) 
     org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) 
     org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 
Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

>>> 14/11/15 23:22:52 INFO BlockManager: Removing broadcast 1 
14/11/15 23:22:52 INFO BlockManager: Removing block broadcast_1 
14/11/15 23:22:52 INFO MemoryStore: Block broadcast_1 of size 5088 dropped from memory (free 278269788) 
14/11/15 23:22:52 INFO ContextCleaner: Cleaned broadcast 1 

सादर, Mrutyunjay

+1

क्या आपने इस समस्या को हल किया था? मुझे भी एक ही समस्या का सामना करना पड़ रहा है, वास्तव में कष्टप्रद। – Tarantula

+0

मुझे भी एक ही त्रुटि मिल रही है –

+0

मुझे यूसीआई बाइक साझाकरण डेटासेट के साथ भी यही त्रुटि मिल रही है। –

उत्तर

2

Mrutynjay,

हालांकि मैं निश्चित जवाब नहीं है। यह मुद्दा स्मृति से संबंधित कुछ जैसा दिखता है। 5 एमबी की फाइल पढ़ने की कोशिश करते समय भी मुझे एक ही समस्या का सामना करना पड़ा। मैंने फ़ाइल का एक हिस्सा हटा दिया और 1 एमबी से भी कम हो गया और कोड काम किया।

मुझे नीचे दी गई साइट पर भी इसी मुद्दे पर कुछ मिला।

http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Failed-to-run-first-td7691.html

1

मैं एक ही त्रुटि मिली, तो मैं pyspark process big datasets problems

से एक releated जवाब मिल गया समाधान को जोड़ने कुछ कोड अजगर/pyspark/worker.py

जोड़े है मुख्य फ़ंक्शन

for obj in iterator: 
pass 
के अंदर परिभाषित प्रक्रिया फ़ंक्शन के अंत में निम्नलिखित 2 पंक्तियां

इसलिए प्रक्रिया समारोह अब इस तरह दिखता है (कम से कम चिंगारी 1.5.2 में):

def process(): 
     iterator = deserializer.load_stream(infile) 
     serializer.dump_stream(func(split_index, iterator), outfile) 
     for obj in iterator: 
      pass 

और यह मेरे लिए काम करता है।

0
  1. एक संभावना यह parsePoint में एक अपवाद है कि वहाँ, एक try except ब्लॉक में कोड लपेट और अपवाद प्रिंट आउट।
  2. अपने --driver-memory पैरामीटर की जांच करें, इसे और अधिक बनाएं।
0

मैं एक ऐसी ही समस्या थी, मैं की तरह कुछ करने की कोशिश की:

numPartitions उदाहरण 10 या 100 डेटा = sc.textFile के लिए एक नंबर = ("myfile.txt", numPartitions)

प्रेरित द्वारा: स्पार्क में समान रूप से पुन: विभाजन कैसे करें? या यहां: https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html

संबंधित मुद्दे