2016-10-26 14 views
15

अस्वीकरण: बस स्पार्क के साथ खेलना शुरू करें।स्पार्क के बंद होने और उनके क्रमबद्धता को समझना

मुझे प्रसिद्ध "टास्क सीरियलज़ेबल" अपवाद को समझने में परेशानी हो रही है लेकिन मेरा प्रश्न उन लोगों से थोड़ा अलग है जो मैं SO (या तो मुझे लगता है) पर देखता है।

मेरे पास एक छोटा कस्टम आरडीडी (TestRDD) है। इसमें एक ऐसा क्षेत्र है जो वस्तुओं को संग्रहीत करता है जिनकी कक्षा Serializable (NonSerializable) लागू नहीं करती है। मैंने Kryo का उपयोग करने के लिए "spark.serializer" कॉन्फ़िगरेशन विकल्प सेट किया है। हालांकि, जब मैं अपने RDD पर count() कोशिश, मैं निम्नलिखित मिल:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable 
Serialization stack: 
- object not serializable (class: com.test.spark.NonSerializable, value: [email protected]) 
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable) 
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28) 
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>)) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933) 

जब मैं देखने के अंदर DAGScheduler.submitMissingTasks मुझे लगता है कि यह मेरे RDD, जो जावा serializer, नहीं Kryo है पर इसके बंद serializer का उपयोग करता है serializer जो मैं उम्मीद करता हूँ। मैंने पढ़ा है कि क्रियो को बंद करने के लिए क्रमबद्ध करने में समस्याएं हैं और स्पार्क हमेशा बंद करने के लिए जावा धारावाहिक का उपयोग करता है लेकिन मुझे समझ में नहीं आता कि यहां कितने बंद होने लगते हैं।

SparkConf conf = new SparkConf() 
         .setAppName("ScanTest") 
         .setMaster("local") 
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 

JavaSparkContext sc = new JavaSparkContext(conf); 

TestRDD rdd = new TestRDD(sc.sc()); 
System.err.println(rdd.count()); 

है, कोई मानचित्रकारों या कुछ भी जो बंद होने की क्रमबद्धता की आवश्यकता होगी: सभी मैं यहाँ कर रहा हूँ यह है। OTOH इस काम करता है:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count() 

Kryo serializer अपेक्षा के अनुरूप, बंद serializer शामिल नहीं है प्रयोग किया जाता है। अगर मैंने धारावाहिक संपत्ति को क्रायो में सेट नहीं किया है, तो मुझे यहां एक अपवाद भी मिलेगा।

मैं किसी भी पॉइंटर्स की सराहना करता हूं कि यह कहां से बंद हो जाता है और यह सुनिश्चित करने के लिए कि मैं कस्टम आरडीडी को क्रमबद्ध करने के लिए क्रियो का उपयोग कर सकता हूं।

अद्यतन: यहाँ अपने गैर serializable क्षेत्र mNS साथ TestRDD है:

class TestRDD extends RDD<String> { 

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class); 

    NonSerializable mNS = new NonSerializable(); 

    public TestRDD(final SparkContext _sc) { 
     super(_sc, 
       JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()), 
       STRING_TAG); 
    } 

    @Override 
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) { 
     return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(), 
                    "test_" + thePartition.index(), 
                    "test_" + thePartition.index()).iterator()).asScala(); 
    } 

    @Override 
    public Partition[] getPartitions() { 
     return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)}; 
    } 

    static class TestPartition implements Partition { 

     final int mIndex; 

     public TestPartition(final int theIndex) { 
      mIndex = theIndex; 
     } 

     public int index() { 
      return mIndex; 
     } 
    } 
} 
+0

क्या आपके पास ऐसा फ़ील्ड है जो आपके 'टेस्टआरडीडी' में' स्पार्ककॉन्टेक्स्ट 'रखता है? हमें 'टेस्टआरडीडी' की अपनी परिभाषा दिखाएं या एक [एमसीवीई] –

+0

@YuvalItzchakov बनाएं। 'स्पार्ककॉन्टेक्स्ट' सुपर के कन्स्ट्रक्टर को पास कर दिया गया है, इसलिए हाँ, आरडीडी इसे पकड़ता है। अपवाद हालांकि इसके बारे में शिकायत नहीं प्रतीत होता है। –

+0

क्या आप 'NonSerializable' पोस्ट कर सकते हैं? –

उत्तर

7

जब मैं अंदर देखो DAGScheduler.submitMissingTasks मुझे लगता है कि यह मेरे RDD, जो जावा serializer है पर के बंद होने serializer का उपयोग करता है , क्रियो सीरिएलाइज़र जो मैं अपेक्षा करता हूं।

SparkEnv दो serializers, एक serializer नामित जो आपके डेटा की क्रमबद्धता, checkpointing के लिए प्रयोग किया जाता है,, आदि का समर्थन करता है कार्यकर्ताओं के बीच संदेश भेजने और spark.serializer विन्यास ध्वज के तहत उपलब्ध है। दूसरे को closureSerializerspark.closure.serializer के तहत कहा जाता है जिसका उपयोग यह जांचने के लिए किया जाता है कि आपकी ऑब्जेक्ट वास्तव में धारावाहिक है और स्पार्क < = 1.6.2 के लिए कॉन्फ़िगर करने योग्य है (लेकिन JavaSerializer के अलावा कुछ भी वास्तव में काम नहीं करता है) और 2.0.0 और ऊपर से JavaSerializer तक हार्डकोड किया गया है।

Kryo बंद serializer एक बग जो यह व्यर्थ है, तो आप SPARK-7708 के तहत है कि बग (इस Kryo 3.0.0 के साथ निर्धारित किया जा सकता देख सकते हैं है, लेकिन वर्तमान में स्पार्क जो Kryo पर तय हो गई है चिल की एक विशेष संस्करण के साथ तय हो गई है 2.2.1)। इसके अलावा, स्पार्क 2.0.x के लिए JavaSerializer अब कॉन्फ़िगर करने योग्य के बजाय तय किया गया है (आप इसे in this pull request देख सकते हैं)। इसका मतलब है कि प्रभावी रूप से हम बंद करने के क्रमिकता के लिए JavaSerializer से फंस गए हैं।

क्या यह अजीब बात है कि हम कार्यों को प्रस्तुत करने के लिए एक धारावाहिक का उपयोग कर रहे हैं और अन्य श्रमिकों और डेटा के बीच डेटा को क्रमबद्ध करने के लिए? निश्चित रूप से, लेकिन यह हमारे पास है।

समेकित करने के लिए, यदि आप spark.serializer कॉन्फ़िगरेशन सेट कर रहे हैं, या SparkContext.registerKryoClasses का उपयोग कर आप स्पार्क में अपने अधिकांश क्रमिकरण के लिए क्रियो का उपयोग करेंगे। ऐसा कहा जाता है कि, किसी दिए गए वर्ग को धारावाहिक और श्रमिकों को कार्यों का क्रमबद्ध करने की जांच करने के लिए, स्पार्क JavaSerializer का उपयोग करेगा।

+0

के रूप में सरल है धन्यवाद, लेकिन यह गलत कैसे है? मैं देख सकता हूं कि 'DAGScheduler'' बंदरगाह Serializer' फ़ील्ड का उपयोग करता है, न कि 'serializer' फ़ील्ड। भले ही मैंने पर्यावरण को क्रियो का उपयोग करने के लिए सेट किया हो या नहीं, 'SparkEnv.get.closureSerializer' हमेशा जावा सीरियलाइज़र है (iirc, उन्होंने 2.0 के बाहर' spark.closure.serializer' विकल्प भी खींच लिया क्योंकि इसे किसी भी तरह से अनदेखा किया गया था) इसलिए मैं देखें कि यह क्यों विफल रहता है। प्रश्न अलग है: शेड्यूलर मेरे मामले में बंद करने वाले धारावाहिक का उपयोग क्यों करता है? मैं अपने आरडीडी> –

+0

एचएम के लिए क्रियो का उपयोग करने के लिए कैसे प्राप्त करूं, मेरा मानना ​​है कि यह कथन स्पार्क 2.0.0 और 2.0.1 (जैसा कि स्टैक ट्रेस से स्पष्ट है) के लिए सही है। आप स्पार्क -12414 भी देख सकते हैं। 'बंद करें Serializer 'एक सार प्रकार का हो सकता है लेकिन AFAICT केवल एक कार्यान्वयन का उपयोग किया जाता है। –

+0

@PavelKlinov आप सही हैं। मैंने थोड़ा गहरा खोला, मेरा अपडेट देखें। –

संबंधित मुद्दे