मैंने अपने स्पार्क नौकरी के लिए क्रायो सीरियलाइजेशन को सक्षम किया, सेटिंग को पंजीकरण की आवश्यकता के लिए सक्षम किया, और सुनिश्चित किया कि मेरे सभी प्रकार पंजीकृत हैं।क्रायो सीरियलाइजेशन का उपयोग करते समय स्पार्क खराब प्रदर्शन क्यों कर रहा है?
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
नौकरी का वालक्लॉक-टाइम प्रदर्शन लगभग 20% तक खराब हो गया और बाइट्स की संख्या में लगभग 400% की वृद्धि हुई।
यह मेरे लिए वास्तव में आश्चर्यजनक लगता है, Spark documentation के सुझाव है कि Kryo बेहतर होना चाहिए दिया।
Kryo काफी तेजी से और अधिक जावा क्रमांकन (अक्सर के रूप में ज्यादा के रूप में 10x) की तुलना में कॉम्पैक्ट
मैं मैन्युअल रूप से स्पार्क के org.apache.spark.serializer.KryoSerializer
और org.apache.spark.serializer.JavaSerializer
के उदाहरण पर लागू serialize
विधि अपने डेटा का एक उदाहरण के साथ है। परिणाम स्पार्क दस्तावेज़ीकरण में सुझावों के अनुरूप थे: क्रियो ने 98 बाइट का उत्पादन किया; जावा ने 993 बाइट्स का उत्पादन किया। यह वास्तव में एक 10x सुधार है।
एक संभवतः यह तर्क गुमराह पहलू यह है कि जिन वस्तुओं पर धारावाहिक किया जा रहा है और एवरो GenericRecord
इंटरफ़ेस को लागू shuffled है। मैंने SparkConf
में एवरो स्कीमा को पंजीकृत करने का प्रयास किया, लेकिन इसमें कोई सुधार नहीं हुआ।
मैं डेटा जो सरल स्काला case class
तों थे, एवरो मशीनरी के किसी भी शामिल नहीं शफ़ल करने के लिए नए वर्गों बनाने की कोशिश की। इसने शफल प्रदर्शन या बाइट्स की संख्या का आदान-प्रदान नहीं किया।
स्पार्क कोड निम्नलिखित करने के लिए नीचे उबलते समाप्त होता है:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: Int) : AnyRef = ???
def put(f: Int, value: Any) : Unit = ???
def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
val SCHEMA$: org.apache.avro.Schema = ???
}
case class B(
f1: Long
f2: Long
f3: String
f4: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(field$ : Int) : AnyRef = ???
def getSchema() : org.apache.avro.Schema = B.SCHEMA$
def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
val SCHEMA$ : org.apache.avro.Schema = ???
}
def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
joined.map { case (_, asAndBs) => asAndBs }
}
क्या आपको पता है क्या पर या संभवतः जा रहे हैं कि कैसे मैं बेहतर प्रदर्शन कि Kryo से उपलब्ध होना चाहिए मिल सकता है?
आप पोस्ट कर सकते हैं उदाहरण के मामले वर्ग और काम? प्रश्न का उत्तर देना बहुत आसान होगा –
अच्छा बिंदु, @ टी। गौडेड। सरलीकृत कोड के साथ अद्यतन किया गया। –
आपने अपना कोड कैसे माप लिया? –