मेरे पास जावा में लिखा गया स्पार्क स्ट्रीमिंग ऐप है और स्पार्क 2.1 का उपयोग कर रहा है। मैं काफ़का से संदेश पढ़ने के लिए KafkaUtils.createDirectStream
का उपयोग कर रहा हूं। मैं कफका संदेशों के लिए क्रियो एन्कोडर/डिकोडर का उपयोग कर रहा हूं। मैंने इसे कफका गुणों में निर्दिष्ट किया-> key.deserializer, value.deserializer, key.serializer, value.deserializer
जब स्पार्क माइक्रो बैच में संदेशों को खींचता है, तो संदेशों को क्रायो डिकोडर का उपयोग करके सफलतापूर्वक डीकोड किया जाता है। हालांकि मैंने देखा कि स्पार्क निष्पादक काफका से पढ़ने वाले प्रत्येक संदेश को डीकोड करने के लिए क्रियो डिकोडर का एक नया उदाहरण बनाता है। मैंने डीकोडर कन्स्ट्रक्टर
के अंदर लॉग डालने से यह जांच की है यह मेरे लिए अजीब लगता है। प्रत्येक संदेश और प्रत्येक बैच के लिए डीकोडर का एक ही उदाहरण इस्तेमाल नहीं किया जाना चाहिए?काफ्का डायरेक्ट स्ट्रीम क्यों हर संदेश के लिए एक नया डिकोडर बनाता है?
कोड जहाँ मैं काफ्का से पढ़ रहा हूँ:
JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
बहुत अच्छी तरह से शोध किया गया! #impressed –
@ युवाल: मैं काफ्का 0.10.x का उपयोग कर रहा हूं। स्पार्क कैश्ड काफ्का उपभोक्ताओं (प्रति निष्पादक) का उपयोग करता है जहां कैश कुंजी उपभोक्ता आईडी, विषय आईडी, विभाजन आईडी द्वारा पहचाना जाता है। यह एक डिकोडर प्रति कफका विभाजन या अन्यथा समानांतर में संदेशों को डीकोड करने के लिए समझ में आता है। मुझे उम्मीद है कि एक कैश किए गए उपभोक्ता के अंदर प्रति विभाजन के बाद एक नया डिकोडर बनाया जाना चाहिए और यह है! मुझे इस समस्या को हल्के भार के नीचे नहीं दिख रहा है, लेकिन केवल तभी जब मैं प्रति सेकंड 1000 संदेशों को पंप करता हूं। शायद मैं एक "जीसी" चक्र में भाग रहा हूँ। क्या आपको कोई जानकारी है कि कफकाआरडीडी कक्षा में लॉगिंग को कैसे सक्षम किया जाए? – scorpio
@scorpio Kafka 0.10.x को एक डिकोडर की आवश्यकता नहीं है। यह अंतर्निहित 'उपभोक्ता रिकॉर्ड' वापस देता है, और आप चुनते हैं कि इसके साथ क्या करना है। क्या आप शायद 'मैप' के अंदर एक डिकोडर का उदाहरण बना रहे हैं? –