2015-06-16 28 views
17

मैं नवीनतम काफ्का दस्तावेज़ http://kafka.apache.org/documentation.html से शुरू कर रहा हूं। लेकिन जब मैं नई उपभोक्ता एपीआई का उपयोग करने की कोशिश करता हूं तो मुझे कुछ समस्या मिलती है। मैं निम्न चरणों के साथ काम किया है:काफ्का 0.8.2 के उपभोक्ता API का उपयोग कैसे करें?

1. जोड़े नई निर्भरता

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.1</version> 
</dependency> 

2. विन्यास

Map<String, Object> config = new HashMap<String, Object>(); 
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "host:9092"); 
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); 
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class.getName()); 
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class.getName()); 
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range"); 

3. उपयोग KafkaConsumer एपीआई

जोड़े
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config); 
consumer.subscribe("topic"); 

हालांकि, जब मैं दलाल से संदेश मतदान करने का प्रयास करें, मैं कुछ भी नहीं लेकिन अशक्त हो गया है:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0); 
if (records != null) 
    process(records); 
else 
    System.err.println("null"); 

और फिर मुझे पता है कि उपभोक्ता के साथ गलत क्या है के बाद मैं स्रोत कोड की जाँच की:

@Override 
public Map<String, ConsumerRecords<K,V>> poll(long timeout) { 
    // TODO Auto-generated method stub 
    return null; 
} 

मामलों को और खराब बनाने के लिए, मुझे 0.8.2 एपीआई के बारे में कोई अन्य उपयोगी जानकारी नहीं मिल रही है, क्योंकि काफ्का के बारे में सभी उपयोग नवीनतम संस्करण के साथ संगत नहीं हैं। क्या कोई मेरी मदद कर सकता है? बहुत बहुत धन्यवाद।

+3

नया काफ्काकॉन्सर API केवल 0.8.3 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan में उपलब्ध होगा। स्पष्ट रूप से ट्रंक में कुछ कार्यान्वयन हैं, हालांकि मुझे राज्य के बारे में कोई जानकारी नहीं है। उस समय के लिए मैं पुराने उपभोक्ता कार्यान्वयन का उपयोग कर रहा हूं। – habsq

+0

धन्यवाद @habsq। तो सभी पुराने एपिस के बीच, काफ़का 0.8.x का उपयोग करते समय कौन सा संस्करण सबसे अच्छा विकल्प है? – Yohn

+0

मैं 0.8.1.1 का उपयोग कर रहा हूं, 'सर्वश्रेष्ठ' विकल्प के बारे में नहीं जानता। – habsq

उत्तर

1

मैं नए निर्माता द्वारा उत्पादित संदेशों को पढ़ने के लिए कफका 0.8.2.1 के शीर्ष पर उपभोक्ता लिखने की भी कोशिश कर रहा हूं।

अब तक मुझे जो मिला है वह है कि निर्माता एपीआई तैयार और प्रयोग योग्य है, जबकि उपभोक्ता पक्ष पर हमें 0.8.3 का इंतजार करना है, क्योंकि @habsq ने नोट किया है और आप पहले से ही यह पता लगा सकते हैं कि कुछ कोड शामिल हैं उपभोक्ता, लेकिन यह अभी भी कार्यात्मक नहीं है।

तो क्लाइंट का उपयोग करने के लिए क्लाइंट (वर्तमान क्लाइंट एपीआई) आपके वर्तमान काफ्का संस्करण की "कोर" परियोजना में पाया गया है, यानी 0.8.2.1 (क्लाइंट को किसी भी अन्य संस्करण में डाउनग्रेड नहीं करना बेहतर है)।

तो अब हमें दो जार आयात करने की आवश्यकता है: एक "नए" जावा क्लाइंट्स और कोर प्रोजेक्ट के लिए एक, आपके द्वारा उपयोग किए जा रहे स्कैला संस्करण के आधार पर (मैं 2.11 का उपयोग करता हूं)।

मेरे मामले में मैं निर्भरता का प्रबंधन करने के graddle उपयोग करती हैं इसलिए मैं सिर्फ आयात करने की आवश्यकता

dependencies { 
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1' 
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1' 
} 

जब आप निर्भरता यह सभी आवश्यक पुस्तकालयों मिल जाएगा अद्यतन करें।

यदि आप एक अलग स्कैला संस्करण का उपयोग कर रहे हैं तो बस संस्करण को बदलें; वैसे भी आप मेवेन सेंट्रल पर सभी अलग-अलग संस्करण या पूर्ण पोम पा सकते हैं: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

यदि आप उन उपभोक्ता कार्यान्वयन का उपयोग करते हैं तो सभी मौजूदा उदाहरण सामान्य रूप से काम करना चाहिए।

पुनश्च रेफरी: काफ्का उपयोगकर्ताओं मिलीलीटर धागा http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

+0

मैंने भविष्य में दो निर्भरताओं को स्पष्ट रूप से रखा है, हमें केवल "क्लाइंट" जार पर पहले की आवश्यकता होगी और इसे "कोर" कफका पर छोड़ दें। असल में अब ग्राहक कोर की निर्भरता भी है, इसलिए जो संक्षिप्त होना चाहता है वह केवल कोर काफ्का पर निर्भरता का उपयोग कर सकता है। – RobMcZag

0

हाँ मैं पुष्टि कर सकता है कि रिहाई 0.8.2.1 लेने वाली संदेशों में समस्या थी। अब जावा/ग्रोवी के साथ एक साधारण उपभोक्ता बनाना और 0.10.1.0 जारी करना, सब कुछ पूरी तरह से काम कर रहा है।

PARTITION_ASSIGNMENT_STRATEGY सेट करने की कोई आवश्यकता नहीं है।

संबंधित मुद्दे