2014-12-20 15 views
7

पर सीरियलाइजेशन अपवाद मैं सर्कलाइजेशन के बारे में स्पार्क पर एक बहुत ही अजीब समस्या को पूरा करता हूं। कोड के रूप में नीचे है:स्पार्क

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable 
{ 
    def infer(document: RDD[Document]): RDD[DocumentParameter] = { 
     val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) 
     docs 
    } 
} 

जहां दस्तावेज़ के रूप में परिभाषित किया गया है:

class Document(val tokens: SparseVector[Int]) extends Serializable 

और DocumentParameter है:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable 

object DocumentParameter extends Serializable 
{ 
    def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics)) 
} 

breeze.linalg.SparseVector में एक serializable वर्ग SparseVectoris।

यह सरल मानचित्र प्रक्रिया है, और सभी वर्गों serializable हैं, लेकिन मैं इस अपवाद प्राप्त करें:

org.apache.spark.SparkException: Task not serializable 

लेकिन जब मैं numOfTopics पैरामीटर को निकालने, वह है:

object DocumentParameter extends Serializable 
{ 
    def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10)) 
} 

और इसे इस तरह कहते हैं:

val docs = documents.map(DocumentParameter.apply) 

और यह ठीक लगता है।

प्रकार int intializable नहीं है? लेकिन मुझे लगता है कि कुछ कोड इस तरह लिखा है।

मुझे यकीन नहीं है कि इस बग को कैसे ठीक किया जाए।

# अपडेट #:

आप @samthebest धन्यवाद। मैं इसके बारे में अधिक जानकारी जोड़ूंगा।

stack trace: 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:270) 
    at com.topicmodel.PLSA.infer(PLSA.scala:13) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37) 
    at $iwC$$iwC$$iwC.<init>(<console>:39) 
    at $iwC$$iwC.<init>(<console>:41) 
    at $iwC.<init>(<console>:43) 
    at <init>(<console>:45) 
    at .<init>(<console>:49) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 46 more 

चूंकि स्टैक ट्रेस अपवाद की सामान्य जानकारी देता है, मैंने इसे हटा दिया।

मैं स्पार्क-खोल में कोड चलाता हूं।

// suppose I have get RDD[Document] for docs 
val numOfTopics = 100 
val plsa = new PLSA(sc, numOfTopics) 
val docPara = plsa.infer(docs) 

क्या आप मुझे serializable पर कुछ ट्यूटोरियल या टिप्स दे सकते हैं?

उत्तर

10

बेनामी फ़ंक्शंस उनके युक्त वर्ग को क्रमबद्ध करता है। जब आप map {doc => DocumentParameter(doc, numOfTopics)}, numOfTopics पर उस फ़ंक्शन तक पहुंचने का एकमात्र तरीका PLSA कक्षा को क्रमबद्ध करना है। और उस वर्ग को वास्तव में धारावाहिक नहीं किया जा सकता है, क्योंकि (जैसा कि आप स्टैकट्रैक से देख सकते हैं) इसमें SparkContext है जो धारावाहिक नहीं है (खराब चीजें तब होती हैं जब व्यक्तिगत क्लस्टर नोड्स के संदर्भ में पहुंच होती है और उदाहरण के लिए नई नौकरियां पैदा कर सकती हैं एक मैपर के भीतर)।

सामान्य रूप से, अपनी कक्षाओं में SparkContext को संग्रहीत करने से बचने का प्रयास करें (संपादित करें: या कम से कम, सुनिश्चित करें कि यह स्पष्ट है कि किस प्रकार की कक्षाओं में SparkContext और किस तरह का नहीं है); इसे अलग-अलग तरीकों से संभवतः एक (संभावित रूप से implicit) पैरामीटर के रूप में पास करना बेहतर है। वैकल्पिक रूप से, PLSA से फ़ंक्शन {doc => DocumentParameter(doc, numOfTopics)} को किसी भिन्न श्रेणी में ले जाएं, जिसे वास्तव में क्रमबद्ध किया जा सकता है।

(जैसा कि कई लोगों ने सुझाव दिया है, कक्षा में SparkContext रखना संभव है लेकिन @transient के रूप में चिह्नित किया गया है ताकि इसे क्रमबद्ध नहीं किया जा सके। मैं इस दृष्टिकोण की अनुशंसा नहीं करता; इसका मतलब है कि कक्षा "जादुई रूप से" बदल जाएगी जब serialized (SparkContext खोना), और जब आप एक क्रमबद्ध नौकरी के अंदर से SparkContext तक पहुंचने का प्रयास करते हैं तो आप एनपीई के साथ समाप्त हो सकते हैं। कक्षाओं के बीच स्पष्ट अंतर बनाए रखना बेहतर है जो केवल "नियंत्रण" कोड में उपयोग किए जाते हैं (और SparkContext का उपयोग कर सकते हैं) और कक्षाएं जो क्लस्टर पर चलाने के लिए क्रमबद्ध हैं (जिनके पास SparkContext नहीं होना चाहिए))।

+2

धन्यवाद। जैसा कि आपने सुझाव दिया है यह काम करता है। साथ ही, मुझे इस समस्या को हल करने का एक और तरीका मिल गया है: 'वैल स्कैन: स्पार्ककॉन्टेक्स्ट' से पहले' @ क्षणिक 'जोड़ें, फिर' स्पार्ककॉन्टेक्स्ट 'को क्रमबद्ध नहीं किया जाएगा। – superhan

+0

मैं इस बात से सहमत नहीं हूं कि आपको अपनी कक्षाओं में 'स्पार्ककॉन्टेक्स्ट' को पूरी तरह से संग्रहीत करना चाहिए (लेकिन फिर भी ऊपर उठाया गया है)। यदि आप इन्हें दायरे में स्टोर नहीं करते हैं तो आप पैरामीटर ब्लोट प्राप्त कर सकते हैं (जो अंतर्निहित पैराम का उपयोग करते समय भी बदसूरत है)। एकमात्र विकल्प यह है कि इसमें कुछ ग्लोबल सिंगलटन रहें जो इसके स्वयं के (ड्रेड नल प्वाइंटर्स) की समस्याएं पैदा करता है। – samthebest

0

यह वास्तव में एक अजीब है, लेकिन मुझे लगता है कि मैं समस्या का अनुमान लगा सकता हूं। लेकिन सबसे पहले, आपने समस्या को हल करने के लिए न्यूनतम प्रदान नहीं किया है (मुझे लगता है, क्योंकि मैंने इनमें से 100 में पहले देखा है)। यहाँ अपने प्रश्न के साथ कुछ समस्याएं हैं:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = { 
    val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) 
} 

इस विधि RDD[DocumentParameter] वापस नहीं करता है यह रिटर्न Unit। आपने गलत तरीके से कोड कॉपी और पेस्ट किया होगा।

दूसरा आपने पूरे स्टैक ट्रेस प्रदान नहीं किए हैं? क्यूं कर? पूर्ण स्टैक ट्रेस प्रदान करने का कोई कारण नहीं है, और त्रुटि को समझने के लिए संदेश के साथ पूर्ण स्टैक ट्रेस आवश्यक है - त्रुटि को समझने के लिए पूरी त्रुटि की आवश्यकता है। आम तौर पर एक धारावाहिक अपवाद आपको बताता है कि क्रमिक नहीं है।

तीसरे तौर पर आपने हमें नहीं बताया है कि infer विधि कहां है, क्या आप इसे खोल में कर रहे हैं? infer की युक्त ऑब्जेक्ट/क्लास/विशेषता आदि क्या है?

वैसे भी, मुझे लगता है कि Int में गुजरकर आप चीजों की श्रृंखला को क्रमबद्ध करने के लिए सीरियलाइज्ड प्राप्त कर सकते हैं, जिसे आप उम्मीद नहीं करते हैं, मैं आपको तब तक अधिक जानकारी नहीं दे सकता जब तक कि आप न्यूनतम नहीं प्रदान करते कोड ताकि हम आपकी समस्या पूरी तरह से समझ सकें।