में कफका संदेश से विषय प्राप्त करें हमारे स्पार्क-स्ट्रीमिंग नौकरी में हम कफका से स्ट्रीमिंग में संदेश पढ़ते हैं।स्पार्क
इसके लिए, हम KafkaUtils.createDirectStream
API का उपयोग करते हैं जो JavaPairInputDStreamfrom
देता है। (तीन विषयों से - test1, test2, test3)
संदेशों काफ्का से पढ़ा जाता है निम्नलिखित तरीके से:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
हम एक अलग तरह से प्रत्येक विषय से संदेश को संभालने के लिए चाहते हैं, और क्रम में करने के लिए इसे प्राप्त करने के लिए हमें प्रत्येक संदेश के लिए विषय का नाम जानने की आवश्यकता है।
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
और इस SplitToLinesFunction
के कार्यान्वयन है:
तो हम निम्न कार्य
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
समस्या यह है कि tuple2._1
अशक्त है और हम मान लिया है कि tuple2._1
कुछ शामिल होंगे है मेटाडेटा जैसे विषय/विभाजन का नाम जहां से संदेश आया था।
हालांकि, जब हम tuple2._1
प्रिंट करते हैं, तो यह शून्य है।
हमारा प्रश्न - क्या काफ़का में विषय का नाम भेजने का कोई तरीका है ताकि स्पार्क-स्ट्रीमिंग कोड में tuple2._1
इसमें (और शून्य नहीं हो) होगा?
ध्यान दें कि हम भी रूप में spark-streaming kafka-integration tutorial में उल्लेख किया DStream से विषय के नाम पर प्राप्त करने की कोशिश:
लेकिन यह सब विषयों कि KafkaUtils.createDirectStream
के लिए भेजा गया है, और नहीं, जहां से संदेश विशिष्ट विषय रिटर्न (जो वर्तमान आरडीडी से संबंधित है) से पहुंचे।
इसलिए इससे हमें इस विषय के नाम की पहचान करने में मदद नहीं मिली, जहां से आरडीडी में संदेश भेजे गए थे।
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
समस्या यह है कि कुछ भी नहीं MessageAndMetadataFunction.call
विधि में छपा था है: -
संपादित
डेविड के जवाब के जवाब में मैं MessageAndMetadata
इस तरह उपयोग करने की कोशिश। MessageAndMetadataFunction.call
विधि के अंदर उस आरडीडी के लिए प्रासंगिक विषय प्राप्त करने के लिए मुझे क्या तय करना चाहिए?
आपका क्या मतलब है "यहां कुछ भी मुद्रित नहीं है"? यहां तक कि "विषय =" भाग, या वह भाग प्रिंट नहीं करता है लेकिन मान खाली हैं। –
यदि कुछ भी नहीं है, तो आपको अपने 'यार्न' लॉग, या जो भी क्लस्टर आप चल रहे हैं, उसे देखना चाहिए। मेरे लिए, '/ usr/local/hadoop/logs/userLogs /' में लॉग फ़ाइलें हैं जो आपके निष्पादकों से 'stdout' को कैप्चर करती हैं। –
क्षमा करें - मुझे अब समस्या पता है। ऐसा इसलिए है क्योंकि आपके 'MessageAndMetadataFunction' को एक ही रिकॉर्ड में एक साथ सिलाई गई संदेश और संदेश दोनों को वापस करना होगा। अभी आप केवल विषय ही लौट रहे हैं, संदेश ही नहीं। यही कारण है कि आप विषय को अधिक से अधिक मुद्रित करते हैं - क्योंकि आप वही है जो आप 'MessageAndMetadataFunction' से लौट रहे हैं - दोनों को वापस लौटें, आपके पास दोनों होंगे। –