मैं नवीनतम kafka_2.10-0.8.2.1 के साथ मैन्युअल रूप से ऑफ़सेट प्रबंधित करने के लिए निम्न-स्तरीय उपभोक्ता जावा एपीआई का उपयोग करने का प्रयास कर रहा हूं। यह सत्यापित करने के लिए कि मैं कफका से प्रतिबद्ध/पढ़ता हूं ऑफसेट्स सही हैं, मैं kafka.tools.ConsumerOffsetChecker टूल का उपयोग करता हूं।काफ्का जावा एपीआई ऑफ़सेट ऑपरेशंस स्पष्टीकरण
यहाँ मेरी विषय/उपभोक्ता समूह के लिए उत्पादन का एक उदाहरण है:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
यहाँ परिणाम मेरी व्याख्या है:
ऑफसेट = 5 -> इस वर्तमान की ऑफसेट है मेरे 'elastic_search_group' उपभोक्ता
logSize = 29 -> इस नवीनतम ऑफसेट है - अगले संदेश है कि इस विषय से आ जाएगा की भरपाई/विभाजन
अंतराल = 24 -> 29-5 - कितने संदेश अभी तक अपने 'elastic_search_group' उपभोक्ता द्वारा कार्रवाई नहीं कर रहे हैं
Pid - विभाजन आईडी
Q1: यह सही है?
अब, मैं अपने जावा उपभोक्ता से वही जानकारी प्राप्त करना चाहता हूं। यहां, मैंने पाया कि मुझे दो अलग-अलग एपीआई का उपयोग करना पड़ा:
kafka.javaapi। ऑफसेटरक्वेट प्रारंभिक और नवीनतम ऑफसेट प्राप्त करने के लिए, लेकिन kafka.javaapi। वर्तमान ऑफसेट प्राप्त करने के लिए OffsetFetchRequest।
प्राप्त करने के लिए जल्द से जल्द (या नवीनतम) मुझे क्या करना ऑफसेट:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
और प्राप्त करने के लिए वर्तमान ऑफसेट मैं एक पूरी तरह से अलग एपीआई का उपयोग करने के लिए है:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: यह सही है? एक बहुत ही समान जानकारी प्राप्त करने के लिए दो अलग-अलग एपीआई क्यों हैं?
प्रश्न 3: क्या इससे कोई फर्क पड़ता है कि संस्करण और सहसंबंध क्या मैं यहां उपयोग कर रहा हूं? हालांकि संस्करण संस्करण पूर्व-0.8.2.1 कफका के लिए 0 होना चाहिए, और 0.8.2.1 और बाद में 1 होना चाहिए - लेकिन ऐसा लगता है कि यह 0.8.2.1 के साथ 0 के साथ भी काम करता है - नीचे देखें?
तो, ऊपर विषय का उदाहरण राज्य, और ConsumerOffsetChecker के ऊपर उत्पादन, यहाँ के लिए मैं क्या मेरी जावा कोड से मिलता है:
currentOffset = 5; earliestOffset = 29; नवीनतमऑफसेट = 2 9
'वर्तमान ऑफसेट' ठीक प्रतीत होता है, 'नवीनतम ऑफसेट' भी सही है, लेकिन 'सबसे पुराना ऑफसेट' है? मैं कम से कम '5' होने की उम्मीद करता हूं?
प्रश्न 4: यह कैसे हो सकता है कि सबसे पुराना ऑफ़सेट वर्तमान ऑफसेट से अधिक है? मेरा एकमात्र संदेह यह है कि अवधारण नीति के कारण शायद विषय से संदेश साफ़ किए गए थे ...। ऐसा कोई अन्य मामला हो सकता था?
धन्यवाद, @ Shades88! कुछ परीक्षण के बाद, # 4 के लिए - मैं एक ही निष्कर्ष पर आया, कि यह स्थिति तब होगी जब प्रतिधारण नीति के कारण लॉग साफ़ किए गए थे। इसलिए मैंने अपने उपभोक्ता तर्क में इस कोने के मामले को संभालने में जोड़ा है - यह सत्यापित करें कि वर्तमान ऑफ़सेट> = सबसे पुराना ऑफसेट है और इसे शुरुआती ऑफसेट पर सेट करें यदि नहीं। धन्यवाद! – Marina
'versionId' के संबंध में, यदि आप' 0' निर्दिष्ट करते हैं, तो ऑफसेट्स को ज़ूकीपर में संग्रहीत किया जाता है और यदि आप '1' का उपयोग करते हैं, तो ऑफ़सेट एक विशेष काफ्का विषय में संग्रहीत किया जाता है। –
एक उपयोगी पृष्ठ http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –