2017-01-09 11 views
14

मैंने अपने स्पार्क नौकरी के लिए क्रायो सीरियलाइजेशन को सक्षम किया, सेटिंग को पंजीकरण की आवश्यकता के लिए सक्षम किया, और सुनिश्चित किया कि मेरे सभी प्रकार पंजीकृत हैं।क्रायो सीरियलाइजेशन का उपयोग करते समय स्पार्क खराब प्रदर्शन क्यों कर रहा है?

val conf = new SparkConf() 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrationRequired", "true") 
conf.registerKryoClasses(classes) 
conf.registerAvroSchemas(avroSchemas: _*) 

नौकरी का वालक्लॉक-टाइम प्रदर्शन लगभग 20% तक खराब हो गया और बाइट्स की संख्या में लगभग 400% की वृद्धि हुई।

यह मेरे लिए वास्तव में आश्चर्यजनक लगता है, Spark documentation के सुझाव है कि Kryo बेहतर होना चाहिए दिया।

Kryo काफी तेजी से और अधिक जावा क्रमांकन (अक्सर के रूप में ज्यादा के रूप में 10x) की तुलना में कॉम्पैक्ट

मैं मैन्युअल रूप से स्पार्क के org.apache.spark.serializer.KryoSerializer और org.apache.spark.serializer.JavaSerializer के उदाहरण पर लागू serialize विधि अपने डेटा का एक उदाहरण के साथ है। परिणाम स्पार्क दस्तावेज़ीकरण में सुझावों के अनुरूप थे: क्रियो ने 98 बाइट का उत्पादन किया; जावा ने 993 बाइट्स का उत्पादन किया। यह वास्तव में एक 10x सुधार है।

एक संभवतः यह तर्क गुमराह पहलू यह है कि जिन वस्तुओं पर धारावाहिक किया जा रहा है और एवरो GenericRecord इंटरफ़ेस को लागू shuffled है। मैंने SparkConf में एवरो स्कीमा को पंजीकृत करने का प्रयास किया, लेकिन इसमें कोई सुधार नहीं हुआ।

मैं डेटा जो सरल स्काला case class तों थे, एवरो मशीनरी के किसी भी शामिल नहीं शफ़ल करने के लिए नए वर्गों बनाने की कोशिश की। इसने शफल प्रदर्शन या बाइट्स की संख्या का आदान-प्रदान नहीं किया।

स्पार्क कोड निम्नलिखित करने के लिए नीचे उबलते समाप्त होता है:

case class A(
    f1: Long, 
    f2: Option[Long], 
    f3: Int, 
    f4: Int, 
    f5: Option[String], 
    f6: Option[Int], 
    f7: Option[String], 
    f8: Option[Int], 
    f9: Option[Int], 
    f10: Option[Int], 
    f11: Option[Int], 
    f12: String, 
    f13: Option[Double], 
    f14: Option[Int], 
    f15: Option[Double], 
    f16: Option[Double], 
    f17: List[String], 
    f18: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(f: Int) : AnyRef = ??? 
    def put(f: Int, value: Any) : Unit = ??? 
    def getSchema(): org.apache.avro.Schema = A.SCHEMA$ 
} 
object A extends AnyRef with Serializable { 
    val SCHEMA$: org.apache.avro.Schema = ??? 
} 

case class B(
    f1: Long 
    f2: Long 
    f3: String 
    f4: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(field$ : Int) : AnyRef = ??? 
    def getSchema() : org.apache.avro.Schema = B.SCHEMA$ 
    def put(field$ : Int, value : Any) : Unit = ??? 
} 
object B extends AnyRef with Serializable { 
    val SCHEMA$ : org.apache.avro.Schema = ??? 
} 

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = { 
    val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b) 
    joined.map { case (_, asAndBs) => asAndBs } 
} 

क्या आपको पता है क्या पर या संभवतः जा रहे हैं कि कैसे मैं बेहतर प्रदर्शन कि Kryo से उपलब्ध होना चाहिए मिल सकता है?

+0

आप पोस्ट कर सकते हैं उदाहरण के मामले वर्ग और काम? प्रश्न का उत्तर देना बहुत आसान होगा –

+0

अच्छा बिंदु, @ टी। गौडेड। सरलीकृत कोड के साथ अद्यतन किया गया। –

+0

आपने अपना कोड कैसे माप लिया? –

उत्तर

0

चूंकि आपके पास उच्च कार्डिनालिटी आरडीडी है, इसलिए प्रसारण/प्रसारण हैश शामिल होने से दुर्भाग्य से सीमाएं लगती हैं।

आपका सबसे अच्छा सबसे अच्छा पूर्व में शामिल होने के लिए अपने RDDs coalesce() है। क्या आप अपने शफल समय में उच्च स्काई देख रहे हैं? यदि ऐसा है, तो आप shuffle = true के साथ सहवास करना चाह सकते हैं।

अंत में, आप नेस्टेड संरचनाओं (जैसे JSON), कि कभी कभी आप शफ़ल बायपास करने के लिए अनुमति देगा के RDDs है। अधिक विस्तृत स्पष्टीकरण के लिए स्लाइड्स और/या वीडियो here देखें।

1

अपने एकल रिकॉर्ड का आकार बहुत छोटा है और रिकॉर्ड की भारी संख्या अपनी नौकरी slow.Try बफर आकार बढ़ाने के लिए और देखने के लिए कि क्या यह किसी भी सुधार करता है के लिए कर सकते हो रही है।

प्रयास करें एक नीचे नहीं तो पहले से ही किया ..

val conf = new SparkConf() 
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    // Now it's 24 Mb of buffer by default instead of 0.064 Mb 
    .set("spark.kryoserializer.buffer.mb","24") 

रेफरी: https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/

संबंधित मुद्दे