2016-07-18 10 views
8

मैं जावा का उपयोग कर kafka उपभोक्ता लिख ​​रहा हूं। मैं संदेश का वास्तविक समय रखना चाहता हूं, इसलिए यदि 1000 या उससे अधिक की तरह उपभोग करने के लिए बहुत सारे संदेश इंतजार कर रहे हैं, तो मुझे अनजान संदेशों को त्यागना चाहिए और नवीनतम ऑफसेट से उपभोग करना शुरू करना चाहिए।मुझे कफका विषय का नवीनतम ऑफसेट कैसे मिल सकता है?

इस समस्या के लिए, मैं अंतिम प्रतिबद्ध ऑफसेट और किसी विषय के नवीनतम ऑफसेट (केवल 1 विभाजन) की तुलना करने की कोशिश करता हूं, यदि इन दो ऑफसेट्स के बीच का अंतर एक निश्चित राशि से बड़ा है, तो मैं नवीनतम ऑफसेट सेट कर दूंगा अगले ऑफसेट के रूप में विषय ताकि मैं उन अनावश्यक संदेशों को त्याग सकूं।

अब मेरी समस्या यह है कि किसी विषय के नवीनतम ऑफसेट को कैसे प्राप्त किया जाए, कुछ लोग कहते हैं कि मैं पुराने उपभोक्ता का उपयोग कर सकता हूं, लेकिन यह बहुत जटिल है, क्या नए उपभोक्ता के पास यह कार्य है?

उत्तर

12

नया उपभोक्ता भी जटिल है।

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

+0

यह काम नहीं करता है अगर आप अपने क्लाइंट वर्तमान ऑफ़सेट और नवीनतम ज्ञात कफका विषय ऑफसेट के बीच के अंतर की गणना करना चाहते हैं! – hiaclibe

5

काफ्का संस्करण के लिए: 0.10.1.1

// Get the diff of current position and latest offset 
Set<TopicPartition> partitions = new HashSet<TopicPartition>(); 
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); 
partitions.add(actualTopicPartition); 
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); 
long actualPosition = consumer.position(actualTopicPartition);   
System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition)); 
0
KafkaConsumer<String, String> consumer = ... 
consumer.subscribe(Collections.singletonList(topic)); 
TopicPartition topicPartition = new TopicPartition(topic, partition); 
consumer.poll(0); 
consumer.seekToEnd(Collections.singletonList(topicPartition)); 
long currentOffset = consumer.position(topicPartition) -1; 

स्निपेट के ऊपर वर्तमान प्रतिबद्ध संदेश दिए गए विषय और विभाजन के लिए ऑफसेट रिटर्न नंबर।

संबंधित मुद्दे