2015-05-20 19 views
9

मैं नवीनतम kafka_2.10-0.8.2.1 के साथ मैन्युअल रूप से ऑफ़सेट प्रबंधित करने के लिए निम्न-स्तरीय उपभोक्ता जावा एपीआई का उपयोग करने का प्रयास कर रहा हूं। यह सत्यापित करने के लिए कि मैं कफका से प्रतिबद्ध/पढ़ता हूं ऑफसेट्स सही हैं, मैं kafka.tools.ConsumerOffsetChecker टूल का उपयोग करता हूं।काफ्का जावा एपीआई ऑफ़सेट ऑपरेशंस स्पष्टीकरण

यहाँ मेरी विषय/उपभोक्ता समूह के लिए उत्पादन का एक उदाहरण है:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic 
Group           Topic                          Pid Offset          logSize         Lag             Owner 
elastic_search_group my_log_topic              0   5               29              24              none 

यहाँ परिणाम मेरी व्याख्या है:

ऑफसेट = 5 -> इस वर्तमान की ऑफसेट है मेरे 'elastic_search_group' उपभोक्ता

logSize = 29 -> इस नवीनतम ऑफसेट है - अगले संदेश है कि इस विषय से आ जाएगा की भरपाई/विभाजन

अंतराल = 24 -> 29-5 - कितने संदेश अभी तक अपने 'elastic_search_group' उपभोक्ता द्वारा कार्रवाई नहीं कर रहे हैं

Pid - विभाजन आईडी

Q1: यह सही है?

अब, मैं अपने जावा उपभोक्ता से वही जानकारी प्राप्त करना चाहता हूं। यहां, मैंने पाया कि मुझे दो अलग-अलग एपीआई का उपयोग करना पड़ा:

kafka.javaapi। ऑफसेटरक्वेट प्रारंभिक और नवीनतम ऑफसेट प्राप्त करने के लिए, लेकिन kafka.javaapi। वर्तमान ऑफसेट प्राप्त करने के लिए OffsetFetchRequest

प्राप्त करने के लिए जल्द से जल्द (या नवीनतम) मुझे क्या करना ऑफसेट:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition); 
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1)); 
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1)); 
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 
OffsetResponse response = simpleConsumer.getOffsetsBefore(request); 
long[] offsets = response.offsets(topic, partition); 
long myEarliestOffset = offsets[0]; 
// OR for Latest: long myLatestOffset = offsets[0]; 

और प्राप्त करने के लिए वर्तमान ऑफसेट मैं एक पूरी तरह से अलग एपीआई का उपयोग करने के लिए है:

short versionID = 0; 
int correlationId = 0; 
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();  
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition); 
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId); 
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq); 
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset(); 

Q2: यह सही है? एक बहुत ही समान जानकारी प्राप्त करने के लिए दो अलग-अलग एपीआई क्यों हैं?

प्रश्न 3: क्या इससे कोई फर्क पड़ता है कि संस्करण और सहसंबंध क्या मैं यहां उपयोग कर रहा हूं? हालांकि संस्करण संस्करण पूर्व-0.8.2.1 कफका के लिए 0 होना चाहिए, और 0.8.2.1 और बाद में 1 होना चाहिए - लेकिन ऐसा लगता है कि यह 0.8.2.1 के साथ 0 के साथ भी काम करता है - नीचे देखें?

तो, ऊपर विषय का उदाहरण राज्य, और ConsumerOffsetChecker के ऊपर उत्पादन, यहाँ के लिए मैं क्या मेरी जावा कोड से मिलता है:

currentOffset = 5; earliestOffset = 29; नवीनतमऑफसेट = 2 9

'वर्तमान ऑफसेट' ठीक प्रतीत होता है, 'नवीनतम ऑफसेट' भी सही है, लेकिन 'सबसे पुराना ऑफसेट' है? मैं कम से कम '5' होने की उम्मीद करता हूं?

प्रश्न 4: यह कैसे हो सकता है कि सबसे पुराना ऑफ़सेट वर्तमान ऑफसेट से अधिक है? मेरा एकमात्र संदेह यह है कि अवधारण नीति के कारण शायद विषय से संदेश साफ़ किए गए थे ...। ऐसा कोई अन्य मामला हो सकता था?

उत्तर

10

मैं विभाजन में अंतराल खोजने के साधनों की खोज कर रहा था। और इसमें आपके द्वारा उठाए गए एक ही कदम शामिल हैं। अब तक, जो भी मैंने सीखा है, मैं आपको जवाब दे सकता हूं।

  1. लॉगसाइज सीधे उस बिंदु पर कितने संदेश जमा किए गए हैं, यह इंगित करता है। या, यह उस विभाजन में संदेशों की अधिकतम ऑफसेट निर्दिष्ट करता है। ऑफ़सेट अंतिम सफलतापूर्वक खपत संदेश का ऑफसेट है। इसलिए लॉग आकार और ऑफ़सेट के बीच अंतर है।
  2. हाँ यह सही है। अब तक, वर्तमान ऑफ़सेट और सबसे पुराना या नवीनतम ऑफ़सेट
  3. ढूंढने का केवल दो ही तरीके हैं, मुझे नहीं पता कि संस्करण आईडी निर्दिष्ट करने की आवश्यकता क्यों है। आप संस्करण आईडी प्राप्त करने के लिए kafka.api.OffsetRequest.CurrentVersion() का उपयोग कर सकते हैं। तो हार्डकोडिंग से बचा जा सकता है। आप सुरक्षित रूप से सहसंबंध मान सकते हैं 0.
  4. यह अजीब है। जब मैं EarliestTime() का उपयोग करता हूं, तो मुझे सबसे पुराना ऑफसेट मिलता है जब भी मेरा वर्तमान ऑफसेट आगे बढ़ता है। इसका मतलब है कि यह विभाजन की शुरुआत है। इसलिए जब कुछ संदेश भविष्य में कुछ समय समाप्त हो जाएंगे, तो यह सबसे पुराना ऑफसेट कुछ गैर-शून्य संख्या होगा। अब अगर प्रतिधारण नीति के कारण संदेशों को मंजूरी दे दी गई है तो अंतराल बदलना चाहिए था। मैं इस व्यवहार के बारे में अनिश्चित हूं। निश्चित रूप से सुनिश्चित करने का एक तरीका होगा, इस तरह के पढ़ने और उसके लॉग में जांच करने के बाद उपभोक्ता चलाना। यह इन तरह की रेखाएं दिखाना चाहिए।

    2015-06-09 18:49:15 :: DEBUG :: विभाजन टोपिकइन्फो: 52 :: रीसेट ऑफ़सेट अनुरोधों का ऑफ़सेट: 2: ऑफसेट = 405952: ऑफसेट = 335372 से 335372 2015-06-09 18 : 49: 15 :: डीबग :: PartitionTopicInfo: 52 :: रीसेट अनुरोध की भरपाई की खपत: 2: प्राप्त किए गए ऑफसेट = 405,952: ऑफसेट खपत = 335373 करने के लिए 335373

ध्यान दें कि ऊपर लॉग लाइनों में, ऑफसेट अवशेष प्राप्त किए गए वही और खपत ऑफसेट बढ़ रहा है। अंत में यह

2015-06-09 18:49:16 :: डीबग :: PartitionTopicInfo में खत्म होगा: 52 :: रीसेट अनुरोध की भरपाई की खपत: 2: प्राप्त किए गए ऑफसेट = 405,952: ऑफसेट खपत = 405952 405952

को

तब इसका मतलब यह होगा कि 335372 से 405952 तक लॉग प्रतिधारण नीति ऑफसेट की अवधि समाप्त हो गई थी

+1

धन्यवाद, @ Shades88! कुछ परीक्षण के बाद, # 4 के लिए - मैं एक ही निष्कर्ष पर आया, कि यह स्थिति तब होगी जब प्रतिधारण नीति के कारण लॉग साफ़ किए गए थे। इसलिए मैंने अपने उपभोक्ता तर्क में इस कोने के मामले को संभालने में जोड़ा है - यह सत्यापित करें कि वर्तमान ऑफ़सेट> = सबसे पुराना ऑफसेट है और इसे शुरुआती ऑफसेट पर सेट करें यदि नहीं। धन्यवाद! – Marina

+0

'versionId' के संबंध में, यदि आप' 0' निर्दिष्ट करते हैं, तो ऑफसेट्स को ज़ूकीपर में संग्रहीत किया जाता है और यदि आप '1' का उपयोग करते हैं, तो ऑफ़सेट एक विशेष काफ्का विषय में संग्रहीत किया जाता है। –

+0

एक उपयोगी पृष्ठ http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –

संबंधित मुद्दे