2017-12-15 98 views
6

मेरे पास जावा में लिखा गया स्पार्क स्ट्रीमिंग ऐप है और स्पार्क 2.1 का उपयोग कर रहा है। मैं काफ़का से संदेश पढ़ने के लिए KafkaUtils.createDirectStream का उपयोग कर रहा हूं। मैं कफका संदेशों के लिए क्रियो एन्कोडर/डिकोडर का उपयोग कर रहा हूं। मैंने इसे कफका गुणों में निर्दिष्ट किया-> key.deserializer, value.deserializer, key.serializer, value.deserializer

जब स्पार्क माइक्रो बैच में संदेशों को खींचता है, तो संदेशों को क्रायो डिकोडर का उपयोग करके सफलतापूर्वक डीकोड किया जाता है। हालांकि मैंने देखा कि स्पार्क निष्पादक काफका से पढ़ने वाले प्रत्येक संदेश को डीकोड करने के लिए क्रियो डिकोडर का एक नया उदाहरण बनाता है। मैंने डीकोडर कन्स्ट्रक्टर

के अंदर लॉग डालने से यह जांच की है यह मेरे लिए अजीब लगता है। प्रत्येक संदेश और प्रत्येक बैच के लिए डीकोडर का एक ही उदाहरण इस्तेमाल नहीं किया जाना चाहिए?काफ्का डायरेक्ट स्ट्रीम क्यों हर संदेश के लिए एक नया डिकोडर बनाता है?

कोड जहाँ मैं काफ्का से पढ़ रहा हूँ:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams)); 

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> { 
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value()); 
}); 

उत्तर

3

हम कैसे स्पार्क आंतरिक काफ्का से डेटा को हासिल करेगा देखना चाहते हैं, तो हम KafkaRDD.compute को देखने के लिए, एक विधि हर RDD के लिए लागू किया है जो की आवश्यकता होगी जो

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { 
    val part = thePart.asInstanceOf[KafkaRDDPartition] 
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) 
    if (part.fromOffset == part.untilOffset) { 
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + 
    s"skipping ${part.topic} ${part.partition}") 
    Iterator.empty 
    } else { 
    new KafkaRDDIterator(part, context) 
    } 
} 

यहाँ क्या महत्वपूर्ण है else खंड है, जो एक KafkaRDDIterator बनाता है: ढांचा कैसे, ठीक है, की गणना कि RDD बताता है। यह आंतरिक रूप से है:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[K]] 

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[V]] 

कौन जैसा कि आप देख, दोनों कुंजी डिकोडर और प्रतिबिंब के माध्यम से मूल्य डिकोडर का एक उदाहरण बनाता प्रत्येक अंतर्निहित विभाजन के लिए। इसका मतलब है कि यह प्रति संदेश प्रति संदेश उत्पन्न नहीं किया जा रहा है, लेकिन प्रति Kafka विभाजन

इस तरह इसे क्यों लागू किया गया है? मुझे नहीं पता। मुझे लगता है कि स्पार्क के अंदर होने वाले सभी अन्य आवंटन की तुलना में एक कुंजी और मूल्य डिकोडर को उपेक्षित प्रदर्शन हिट होना चाहिए।

यदि आपने अपना ऐप प्रोफाइल किया है और इसे आवंटन हॉट-पथ माना है, तो आप कोई समस्या खोल सकते हैं। अन्यथा, मैं इसके बारे में चिंता नहीं करता।

+0

बहुत अच्छी तरह से शोध किया गया! #impressed –

+0

@ युवाल: मैं काफ्का 0.10.x का उपयोग कर रहा हूं। स्पार्क कैश्ड काफ्का उपभोक्ताओं (प्रति निष्पादक) का उपयोग करता है जहां कैश कुंजी उपभोक्ता आईडी, विषय आईडी, विभाजन आईडी द्वारा पहचाना जाता है। यह एक डिकोडर प्रति कफका विभाजन या अन्यथा समानांतर में संदेशों को डीकोड करने के लिए समझ में आता है। मुझे उम्मीद है कि एक कैश किए गए उपभोक्ता के अंदर प्रति विभाजन के बाद एक नया डिकोडर बनाया जाना चाहिए और यह है! मुझे इस समस्या को हल्के भार के नीचे नहीं दिख रहा है, लेकिन केवल तभी जब मैं प्रति सेकंड 1000 संदेशों को पंप करता हूं। शायद मैं एक "जीसी" चक्र में भाग रहा हूँ। क्या आपको कोई जानकारी है कि कफकाआरडीडी कक्षा में लॉगिंग को कैसे सक्षम किया जाए? – scorpio

+0

@scorpio Kafka 0.10.x को एक डिकोडर की आवश्यकता नहीं है। यह अंतर्निहित 'उपभोक्ता रिकॉर्ड' वापस देता है, और आप चुनते हैं कि इसके साथ क्या करना है। क्या आप शायद 'मैप' के अंदर एक डिकोडर का उदाहरण बना रहे हैं? –

संबंधित मुद्दे