2016-04-21 15 views
5

में कफका संदेश से विषय प्राप्त करें हमारे स्पार्क-स्ट्रीमिंग नौकरी में हम कफका से स्ट्रीमिंग में संदेश पढ़ते हैं।स्पार्क

इसके लिए, हम KafkaUtils.createDirectStream API का उपयोग करते हैं जो JavaPairInputDStreamfrom देता है। (तीन विषयों से - test1, test2, test3)

संदेशों काफ्का से पढ़ा जाता है निम्नलिखित तरीके से:

private static final String TOPICS = "test1,test2,test3"; 
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(","))); 

HashMap<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", BROKERS); 

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
       streamingContext, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topicsSet 
       ); 

हम एक अलग तरह से प्रत्येक विषय से संदेश को संभालने के लिए चाहते हैं, और क्रम में करने के लिए इसे प्राप्त करने के लिए हमें प्रत्येक संदेश के लिए विषय का नाम जानने की आवश्यकता है।

JavaDStream<String> lines = messages.map(new SplitToLinesFunction()); 

और इस SplitToLinesFunction के कार्यान्वयन है:

तो हम निम्न कार्य

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> { 
    @Override 
    public String call(Tuple2<String, String> tuple2) 
    { 
     System.out.println(tuple2._1); 
     return tuple2._2(); 
    } 
} 

समस्या यह है कि tuple2._1 अशक्त है और हम मान लिया है कि tuple2._1 कुछ शामिल होंगे है मेटाडेटा जैसे विषय/विभाजन का नाम जहां से संदेश आया था।

हालांकि, जब हम tuple2._1 प्रिंट करते हैं, तो यह शून्य है।

हमारा प्रश्न - क्या काफ़का में विषय का नाम भेजने का कोई तरीका है ताकि स्पार्क-स्ट्रीमिंग कोड में tuple2._1 इसमें (और शून्य नहीं हो) होगा?

ध्यान दें कि हम भी रूप में spark-streaming kafka-integration tutorial में उल्लेख किया DStream से विषय के नाम पर प्राप्त करने की कोशिश:

लेकिन यह सब विषयों कि KafkaUtils.createDirectStream के लिए भेजा गया है, और नहीं, जहां से संदेश विशिष्ट विषय रिटर्न (जो वर्तमान आरडीडी से संबंधित है) से पहुंचे।

इसलिए इससे हमें इस विषय के नाम की पहचान करने में मदद नहीं मिली, जहां से आरडीडी में संदेश भेजे गए थे।

 Map<TopicAndPartition, Long> topicAndPartition = new HashMap(); 
     topicAndPartition.put(new TopicAndPartition("test1", 0), 1L); 
     topicAndPartition.put(new TopicAndPartition("test2", 0), 1L); 
     topicAndPartition.put(new TopicAndPartition("test3", 0), 1L); 

     class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String> 
     { 

      @Override 
      public String call(MessageAndMetadata<String, String> v1) 
        throws Exception { 
       // nothing is printed here 
       System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition()); 
       return v1.topic(); 
      } 

     } 

     JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction()); 
     messages.foreachRDD(new VoidFunction() { 

      @Override 
      public void call(Object t) throws Exception { 
       JavaRDD<String> rdd = (JavaRDD<String>)t; 
       OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
       // here all the topics kafka listens to are printed, but that doesn't help 
       for (OffsetRange offset : offsets) { 
        System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset()); 
       } 
      } 
     }); 

समस्या यह है कि कुछ भी नहीं MessageAndMetadataFunction.call विधि में छपा था है: -

संपादित

डेविड के जवाब के जवाब में मैं MessageAndMetadata इस तरह उपयोग करने की कोशिश। MessageAndMetadataFunction.call विधि के अंदर उस आरडीडी के लिए प्रासंगिक विषय प्राप्त करने के लिए मुझे क्या तय करना चाहिए?

+0

आपका क्या मतलब है "यहां कुछ भी मुद्रित नहीं है"? यहां तक ​​कि "विषय =" भाग, या वह भाग प्रिंट नहीं करता है लेकिन मान खाली हैं। –

+0

यदि कुछ भी नहीं है, तो आपको अपने 'यार्न' लॉग, या जो भी क्लस्टर आप चल रहे हैं, उसे देखना चाहिए। मेरे लिए, '/ usr/local/hadoop/logs/userLogs /' में लॉग फ़ाइलें हैं जो आपके निष्पादकों से 'stdout' को कैप्चर करती हैं। –

+0

क्षमा करें - मुझे अब समस्या पता है। ऐसा इसलिए है क्योंकि आपके 'MessageAndMetadataFunction' को एक ही रिकॉर्ड में एक साथ सिलाई गई संदेश और संदेश दोनों को वापस करना होगा। अभी आप केवल विषय ही लौट रहे हैं, संदेश ही नहीं। यही कारण है कि आप विषय को अधिक से अधिक मुद्रित करते हैं - क्योंकि आप वही है जो आप 'MessageAndMetadataFunction' से लौट रहे हैं - दोनों को वापस लौटें, आपके पास दोनों होंगे। –

उत्तर

6

createDirectStream के संस्करणों में से एक का उपयोग करें जो पैरामीटर के रूप में messageHandler फ़ंक्शन लेता है। यहाँ मैं क्या करना है:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
    ssc, 
    kafkaParams, 
    getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap, 
    (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} 
) 

वहाँ वहाँ सामान है कि आप के लिए कुछ भी मतलब नहीं है कि है - प्रासंगिक हिस्सा

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} 

है आप Scala से परिचित नहीं हैं, तो सभी कार्य करता है वापसी है Tuple2 जिसमें msg.topic और msg.message शामिल हैं। आपके फ़ंक्शन को डाउनस्ट्रीम का उपयोग करने के लिए आपके फ़ंक्शन को इन दोनों को वापस करने की आवश्यकता है।आप बस इसके बजाय पूरे MessageAndMetadata ऑब्जेक्ट को वापस कर सकते हैं, जो आपको कुछ अन्य रोचक फ़ील्ड देता है। लेकिन अगर आप केवल topic और message चाहते थे, तो उपर्युक्त का उपयोग करें।

+0

अरे ऐसा लगता है कि आपके पास अतिरिक्त ब्रेस है, कृपया इसे सही करें। –

+1

@ डेविड क्या आप स्कैला में एक कामकाजी या विस्तृत उदाहरण प्रदान कर सकते हैं। जैसे ही मैं इन पैरामीटरों से ऑफसेट्स, संदेश हैंडलर से उलझन में हूं। आपको धन्यवाद! –

1

Kafka integration guide के नीचे, एक उदाहरण है जो संदेशों से विषय निकालता है।

जावा में प्रासंगिक कोड:

// Hold a reference to the current offset ranges, so it can be used downstream 
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); 

directKafkaStream.transformToPair(
    new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { 
    @Override 
    public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { 
     OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
     offsetRanges.set(offsets); 
     return rdd; 
    } 
    } 
).map(
    ... 
).foreachRDD(
    new Function<JavaPairRDD<String, String>, Void>() { 
    @Override 
    public Void call(JavaPairRDD<String, String> rdd) throws IOException { 
     for (OffsetRange o : offsetRanges.get()) { 
     System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() 
     ); 
     } 
     ... 
     return null; 
    } 
    } 
); 

यह शायद अधिक कॉम्पैक्ट कुछ जो सिर्फ विषय है और कुछ नहीं के लिए पूछता में ढह जा सकता है।

+0

मैंने कोशिश की, यह वर्तमान आरडीडी से संबंधित विषय के बजाय, कफका सुनता है कि सभी विषयों को मुद्रित करता है। जैसे - अगर मैं 3 विषयों को सुनता हूं - test1, test2, test3 - और संदेश केवल test1 से आते हैं, तो यह कोड प्रत्येक RDD के लिए test1, test2, test3 प्रिंट करेगा। तो यह कोड मेरी मदद नहीं करता है। –

+0

यह काम नहीं करता है – User3