2015-05-20 41 views
5

से विषय जाओ मैं कैसे काफ्का में किसी संदेश से विषय नाम की पहचान कर सकते हैं।काफ्का संदेश

String[] topics = { "test", "test1", "test2" }; 
    for (String t : topics) { 
     topicMap.put(t, new Integer(3)); 
    } 

SparkConf conf = new SparkConf().setAppName("KafkaReceiver") 
      .set("spark.streaming.receiver.writeAheadLog.enable", "false") 
      .setMaster("local[4]") 
      .set("spark.cassandra.connection.host", "localhost"); 
    ; 
    final JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
      1000)); 

    /* Receive Kafka streaming inputs */ 
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils 
      .createStream(jssc, "localhost:2181", "test-group", 
        topicMap); 

    JavaDStream<MessageAndMetadata> data = 
      messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>() 
      { 

       public MessageAndMetadata call(Tuple2<String, String> message) 
       { 
        System.out.println("message ="+message._2); 
        return null; 
       } 
      } 

     ); 

मैं काफ्का निर्माता से संदेश प्राप्त कर सकता हूं। लेकिन चूंकि उपभोक्ता अब तीन विषय से उपभोग कर रहा है, इसलिए विषय के नाम की पहचान करने की आवश्यकता है।

+0

मैं बहुत ही इस के जवाब में दिलचस्पी रखता हूँ। क्या आपको कोई रास्ता मिला? –

+0

@ अरुण: क्या आपको कोई समाधान मिला? यदि हां, तो क्या आप इसे साझा कर सकते हैं? धन्यवाद! एक पैरामीटर के रूप JFunction [MessageAndMetadata [कश्मीर, V], आर]: – jithinpt

उत्तर

0

दुर्भाग्यवश, यह स्पार्क के स्रोत कोड में कफका रिसीवर और रिलायबल कफका रिसीवर के रूप में सरल नहीं है केवल संदेश औरMetadata.key और संदेश संग्रहीत करें।

दो खुले स्पार्क के JIRA में इस मुद्दे से संबंधित टिकट के होते हैं:

जो थोड़ी देर के लिए खोल दिया गया है।

एक गंदा कॉपी/पेस्ट/आपकी समस्या का हल करने के लिए स्पार्क के स्रोत कोड के संशोधित:

package org.apache.spark.streaming.kafka 

import java.lang.{Integer => JInt} 
import java.util.{Map => JMap, Properties} 

import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} 
import kafka.serializer.{Decoder, StringDecoder} 
import kafka.utils.VerifiableProperties 
import org.apache.spark.Logging 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 
import org.apache.spark.streaming.util.WriteAheadLogUtils 
import org.apache.spark.util.ThreadUtils 
import scala.collection.JavaConverters._ 
import scala.collection.Map 
import scala.reflect._ 

object MoreKafkaUtils { 

    def createStream(
    jssc: JavaStreamingContext, 
    zkQuorum: String, 
    groupId: String, 
    topics: JMap[String, JInt], 
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
): JavaReceiverInputDStream[(String, String, String)] = { 
    val kafkaParams = Map[String, String](
     "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, 
     "zookeeper.connection.timeout.ms" -> "10000") 
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(jssc.ssc.conf) 
    new KafkaInputDStreamWithTopic[String, String, StringDecoder, StringDecoder](jssc.ssc, kafkaParams, topics.asScala.mapValues(_.intValue()), walEnabled, storageLevel) 
    } 

} 

private[streaming] 
class KafkaInputDStreamWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    @transient ssc_ : StreamingContext, 
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    useReliableReceiver: Boolean, 
    storageLevel: StorageLevel 
) extends ReceiverInputDStream[(K, V, String)](ssc_) with Logging { 

    def getReceiver(): Receiver[(K, V, String)] = { 
    if (!useReliableReceiver) { 
     new KafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } else { 
     new ReliableKafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } 
    } 
} 

private[streaming] 
class KafkaReceiverWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    storageLevel: StorageLevel 
) extends Receiver[(K, V, String)](storageLevel) with Logging { 

    // Connection to Kafka 
    var consumerConnector: ConsumerConnector = null 

    def onStop() { 
    if (consumerConnector != null) { 
     consumerConnector.shutdown() 
     consumerConnector = null 
    } 
    } 

    def onStart() { 

    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) 

    // Kafka connection properties 
    val props = new Properties() 
    kafkaParams.foreach(param => props.put(param._1, param._2)) 

    val zkConnect = kafkaParams("zookeeper.connect") 
    // Create the connection to the cluster 
    logInfo("Connecting to Zookeeper: " + zkConnect) 
    val consumerConfig = new ConsumerConfig(props) 
    consumerConnector = Consumer.create(consumerConfig) 
    logInfo("Connected to " + zkConnect) 

    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[K]] 
    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[V]] 

    // Create threads for each topic/message Stream we are listening 
    val topicMessageStreams = consumerConnector.createMessageStreams(
     topics, keyDecoder, valueDecoder) 

    val executorPool = 
     ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") 
    try { 
     // Start the messages handler for each partition 
     topicMessageStreams.values.foreach { streams => 
     streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } 
     } 
    } finally { 
     executorPool.shutdown() // Just causes threads to terminate after work is done 
    } 
    } 

    // Handles Kafka messages 
    private class MessageHandler(stream: KafkaStream[K, V]) 
    extends Runnable { 
    def run() { 
     logInfo("Starting MessageHandler.") 
     try { 
     val streamIterator = stream.iterator() 
     while (streamIterator.hasNext()) { 
      val msgAndMetadata = streamIterator.next() 
      store((msgAndMetadata.key, msgAndMetadata.message, msgAndMetadata.topic)) 
     } 
     } catch { 
     case e: Throwable => reportError("Error handling message; exiting", e) 
     } 
    } 
    } 

} 
+0

तुम भी प्रायोगिक KafkaUtils.createDirectStream जो एक messageHandler लेता इस्तेमाल करने की कोशिश कर सकते हैं। –

1

स्पार्क 1.5.0 के रूप में, official documentation है जो कोई रिसीवर/प्रत्यक्ष हाल में जारी से शुरू दृष्टिकोण, का उपयोग कर प्रोत्साहित करती है हाल ही में 1.5.0 में प्रयोगात्मक से स्नातक की उपाधि प्राप्त की। यह नया डायरेक्ट एपीआई आपको अन्य अच्छी चीजों के अलावा आसानी से संदेश और इसके मेटाडेटा प्राप्त करने की अनुमति देता है।

+0

मैं प्रत्यक्ष दृष्टिकोण का उपयोग कर रहा हूं और समझ नहीं पा रहा हूं कि संदेश मेटाडेटा कैसे प्राप्त करें। क्या आप विस्तार से समझा सकते हैं? –

+0

@BrandonBradley, कृपया उपरोक्त लिंक का पालन आधिकारिक दस्तावेज में पिछले कोड स्निपेट देखें। असल में, आपको आरडीडी को हैसऑफसेटरेंज में जैसे ही आप इसे प्राप्त करते हैं। –

संबंधित मुद्दे