2015-06-17 9 views
20

मैं काफ्का का अध्ययन करने वाला एक नया छात्र हूं और मैंने कई उपभोक्ताओं को समझने के साथ कुछ मौलिक मुद्दों में भाग लिया है कि लेख, दस्तावेज इत्यादि अब तक बहुत उपयोगी नहीं हैं।मैं काफ़का में कई उपभोक्ताओं का उपयोग कैसे करूं?

एक चीज जिसे मैंने करने की कोशिश की है, वह अपना उच्च स्तर काफ़का निर्माता और उपभोक्ता लिखता है और उन्हें एक साथ चलाता है, एक विषय पर 100 सरल संदेश प्रकाशित करता है और मेरा उपभोक्ता उन्हें पुनर्प्राप्त करता है। मैंने इसे सफलतापूर्वक करने में कामयाब रहा है, लेकिन जब मैं एक दूसरे उपभोक्ता को उसी विषय से उपभोग करने का प्रयास करता हूं जो संदेशों को अभी प्रकाशित किया गया था, तो उसे कोई संदेश नहीं मिला।

यह मेरी समझ थी कि प्रत्येक विषय के लिए, आप उपभोक्ता अलग उपभोक्ता समूहों से प्राप्त कर सकते हैं और इनमें से प्रत्येक उपभोक्ता समूह को किसी विषय पर उत्पादित संदेशों की पूरी प्रति प्राप्त होगी। क्या ये सही है? यदि नहीं, तो मेरे लिए एकाधिक उपभोक्ताओं को स्थापित करने का उचित तरीका क्या होगा?

public class AlternateConsumer extends Thread { 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final Boolean isAsync = false; 

    public AlternateConsumer(String topic, String consumerGroup) { 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers", "localhost:9092"); 
     properties.put("group.id", consumerGroup); 
     properties.put("partition.assignment.strategy", "roundrobin"); 
     properties.put("enable.auto.commit", "true"); 
     properties.put("auto.commit.interval.ms", "1000"); 
     properties.put("session.timeout.ms", "30000"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumer = new KafkaConsumer<Integer, String>(properties); 
     consumer.subscribe(topic); 
     this.topic = topic; 
    } 


    public void run() { 
     while (true) { 
      ConsumerRecords<Integer, String> records = consumer.poll(0); 
      for (ConsumerRecord<Integer, String> record : records) { 
       System.out.println("We received message: " + record.value() + " from topic: " + record.topic()); 
      } 
     } 

    } 
} 

इसके अलावा, मैंने देखा है कि मूल रूप से मैं केवल एक ही विभाजन के साथ एक विषय 'परीक्षण' के लिए ऊपर की खपत का परीक्षण किया गया था: यह उपभोक्ता वर्ग है कि मैं अब तक लिखा है है। जब मैंने एक मौजूदा उपभोक्ता समूह में एक और उपभोक्ता समूह 'टेस्ट ग्रुप' कहा, तो इसने एक कफका विद्रोह शुरू किया जिसने सेकंड की परिमाण में, मेरी खपत की विलम्ब को एक महत्वपूर्ण राशि से धीमा कर दिया। मैंने सोचा था कि यह पुनर्विक्रय के साथ एक मुद्दा था क्योंकि मेरे पास केवल एक विभाजन था, लेकिन जब मैंने 6 विभाजनों के साथ एक नया विषय 'एकाधिक विभाजन' बनाया, तो इसी तरह के मुद्दे सामने आए जहां एक ही उपभोक्ता समूह में अधिक उपभोक्ताओं को जोड़ने से विलंबता के कारण हुए। मैंने चारों ओर देखा है और लोग मुझे बता रहे हैं कि मुझे एक बहु थ्रेडेड उपभोक्ता का उपयोग करना चाहिए - क्या कोई उस पर प्रकाश डाल सकता है?

+0

काफ़का '0.8.1' के लिए उच्च स्तर के उपभोक्ता [यहां] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) का एक शानदार उदाहरण है। – chrsblck

+0

@chrsblck लिंक के लिए धन्यवाद।मैंने वास्तव में जांच की है कि पहले और शायद इसे समझ में नहीं आया था और साथ ही मैं कर सकता था - क्या आप शायद थोड़ा सा समझा सकते हैं कि यह उदाहरण धागे का उपयोग कैसे करता है? मैं पूरी तरह समझ नहीं पा रहा हूं कि वे इस समय क्या कर रहे हैं। –

+0

एक तरीका है किसी दिए गए विषय के लिए विभाजन के समान धागे की संख्या। लेख से - धाराओं की सूची 'सूची > streams = उपभोक्ताMap.get (विषय);' ... फिर प्रत्येक थ्रेड को एक विभाजन 'execor.submit असाइन करें (नया उपभोक्ताटेस्ट (धारा, धागा संख्या)) '। – chrsblck

उत्तर

17

मुझे लगता है कि आपकी समस्या auto.offset.reset संपत्ति के साथ है। जब कोई नया उपभोक्ता किसी विभाजन से पढ़ता है और कोई पिछली प्रतिबद्ध ऑफ़सेट नहीं होती है, तो auto.offset.reset प्रॉपर्टी का उपयोग यह तय करने के लिए किया जाता है कि प्रारंभिक ऑफसेट क्या होना चाहिए। यदि आप इसे "सबसे बड़ा" (डिफ़ॉल्ट) पर सेट करते हैं तो आप नवीनतम (अंतिम) संदेश पर पढ़ना शुरू करते हैं। यदि आप इसे "सबसे छोटा" पर सेट करते हैं तो आपको पहला उपलब्ध संदेश मिलता है।

तो जोड़ें:

properties.put("auto.offset.reset", "smallest"); 

और फिर कोशिश करें।

+1

यह देर से प्रतिक्रिया है लेकिन क्रिस धन्यवाद! आपके समाधान सही हैं और कुछ दस्तावेजों पर अधिक बारीकी से देखने के बाद मुझे यह ध्यान रखना चाहिए था कि एक नया उपभोक्ता लॉन्च करने पर यह केवल नए भेजे गए संदेशों का उपभोग करने के लिए सेट है - उपरोक्त गुणों को सेट किए जाने तक पूर्व-मौजूदा नहीं। –

4

प्रलेखन में here यह कहता है: "यदि आप इस विषय पर विभाजन के मुकाबले अधिक धागे प्रदान करते हैं, तो कुछ धागे कभी संदेश नहीं देखेंगे"। क्या आप अपने विषय में विभाजन जोड़ सकते हैं? मेरे पास मेरे उपभोक्ता समूह थ्रेड गिनती मेरे विषय में विभाजन की संख्या के बराबर है, और प्रत्येक थ्रेड संदेश प्राप्त कर रहा है।

यहाँ मेरी विषय config है:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins 
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

और मेरे उपभोक्ता:

package com.cie.dispatcher.services; 

import com.cie.dispatcher.model.WinNotification; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.google.inject.Inject; 
import io.dropwizard.lifecycle.Managed; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* This will create three threads, assign them to a "group" and listen for notifications on a topic. 
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by 
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the 
* lifecycle manager in dropwizard. 
* <p/> 
* Created by aakture on 6/15/15. 
*/ 
public class KafkaTopicListener implements Managed { 
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); 
private final ConsumerConnector consumer; 
private final String topic; 
private ExecutorService executor; 
private int threadCount; 
private WinNotificationWorkflow winNotificationWorkflow; 
private ObjectMapper objectMapper; 

@Inject 
public KafkaTopicListener(String a_zookeeper, 
          String a_groupId, String a_topic, 
          int threadCount, 
          WinNotificationWorkflow winNotificationWorkflow, 
          ObjectMapper objectMapper) { 
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
      createConsumerConfig(a_zookeeper, a_groupId)); 
    this.topic = a_topic; 
    this.threadCount = threadCount; 
    this.winNotificationWorkflow = winNotificationWorkflow; 
    this.objectMapper = objectMapper; 
} 

/** 
* Creates the config for a connection 
* 
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example. 
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. 
* @return the config props 
*/ 
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", zookeeper); 
    props.put("group.id", groupId); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 

    return new ConsumerConfig(props); 
} 

public void stop() { 
    if (consumer != null) consumer.shutdown(); 
    if (executor != null) executor.shutdown(); 
    try { 
     if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
      LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     } 
    } catch (InterruptedException e) { 
     LOG.info("Interrupted during shutdown, exiting uncleanly"); 
    } 
    LOG.info("{} shutdown successfully", this.getClass().getName()); 
} 
/** 
* Starts the listener 
*/ 
public void start() { 
    Map<String, Integer> topicCountMap = new HashMap<>(); 
    topicCountMap.put(topic, new Integer(threadCount)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
    executor = Executors.newFixedThreadPool(threadCount); 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ListenerThread(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

private class ListenerThread implements Runnable { 
    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     try { 
      String message = null; 
      LOG.info("started listener thread: {}", m_threadNumber); 
      ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
      while (it.hasNext()) { 
       try { 
        message = new String(it.next().message()); 
        LOG.info("receive message by " + m_threadNumber + " : " + message); 
        WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); 
        winNotificationWorkflow.process(winNotification); 
       } catch (Exception ex) { 
        LOG.error("error processing queue for message: " + message, ex); 
       } 
      } 
      LOG.info("Shutting down listener thread: " + m_threadNumber); 
     } catch (Exception ex) { 
      LOG.error("error:", ex); 
     } 
    } 
    } 
} 
+0

क्या आप कृपया कफका 1.0 संस्करण के लिए उदाहरण साझा कर सकते हैं, क्योंकि उपरोक्त उदाहरण में उपयोग की जाने वाली अधिकांश कक्षाएं बहिष्कृत हैं। –

+0

मुझे विश्वास नहीं है कि यह उस वक्त बाहर था, मैं जल्द ही अपने कोड को अपग्रेड करने के लिए चारों ओर नहीं जा सकता, क्षमा चाहता हूं। –

4

आप एक ही संदेश (एक प्रसारण की तरह) का उपभोग करने के लिए कई उपभोक्ताओं चाहते हैं, तो आप उन्हें अलग उपभोक्ता समूह के साथ अंडे कर सकते हैं और उपभोक्ता कॉन्फ़िगरेशन में auto.offset.reset को सबसे छोटा सेट भी कर रहा है। यदि आप एकाधिक उपभोक्ताओं को समानांतर में उपभोग करने के लिए चाहते हैं (उनके बीच काम विभाजित करें), तो आपको विभाजन की संख्या> = उपभोक्ताओं की संख्या बनाना चाहिए। एक विभाजन केवल उपभोक्ता प्रक्रिया द्वारा ही उपभोग किया जा सकता है। लेकिन एक उपभोक्ता एक से अधिक विभाजन का उपभोग कर सकता है।

संबंधित मुद्दे