2014-11-20 5 views
7

मैं एक आवेदन है कि सदस्यता काफ्का से विषय जोड़ और गतिशील रूप से हटाया जा करने की अनुमति देता का निर्माण कर रहा हूँ। जब एक विषय सदस्यता जोड़ा जाता है मैं एक बैच कार्य हर घंटे कि नए संदेशों के सब हो जाता है और उन्हें किसी अन्य डेटासंग्रह में धक्का चलाने के लिए चाहता था।काफ्का - सबसे आसान तरीका नवीनतम ऑफसेट हो जाओ करने के लिए

जो मैं समझना चाहता हूं वह है कि किसी विषय की वर्तमान ऑफसेट कैसे प्राप्त करें। जैसे ही सब्सक्रिप्शन जोड़ा जाता है, मैं चाहता हूं कि अगले बैच नौकरी सदस्यता के अनुमानित समय से सभी संदेश प्राप्त करें।

उदाहरण के तौर पर, कल्पना करें कि मेरे पास "टॉपिका" नामक एक विषय है जो लगातार संदेश प्राप्त कर रहा है। यदि मैं 7.15 बजे एक सदस्यता जोड़ता हूं, जब बैच नौकरी 8 बजे चलती है तो मैं 7.15 बजे से सभी संदेशों को बैच अप लेना चाहता हूं। मैं अनुमान के समय के लिए खुश हूं - 7.10, 7.20 इत्यादि। 5 या 10 मिनट दोनों तरफ मुझे कोई चिंता नहीं होती है।

तो मेरा इरादा समाधान एक सब्सक्रिप्शन के पल में किसी विषय की वर्तमान ऑफसेट प्राप्त करना है। मैंने साधारण उपभोक्ता को देखा है, लेकिन मैं इस मूल उपयोग मामले के लिए क्लस्टर प्रबंधन के सभी पहलुओं में शामिल नहीं होना चाहता हूं।

मैं भी उच्च स्तरीय उपभोक्ता देखा है। मैं ऐसा कुछ कर सकता था:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset 

इस दृष्टिकोण के साथ मुझे क्या चिंता है कि "सिर" का आह्वान वास्तव में एक धारा है। तो मेरा मानना ​​है कि यह अगले संदेश की प्रतीक्षा कर देगा। अवरुद्ध करना समस्याग्रस्त है क्योंकि यह अगले संदेश आने तक अन्य सब्सक्रिप्शन को कतारबद्ध कर सकता है।

मैं बाद दृष्टिकोण को लागू करने के लिए कुछ समय बिताने के लिए खुश हूँ, लेकिन अगर वहाँ एक आसान तरीका है कि मुझे आवश्यकता नहीं है त्रुटि प्रवण समवर्ती कोड लिखने के लिए है, तो मैं नहीं बल्कि मेरा समय बर्बाद मत चाहते हैं।

मुझे ऑफसेट के बाद से सभी लॉग प्राप्त करने का भी एक तरीका चाहिए।

उत्तर

2

एक लाने का अनुरोध करने के लिए हर प्रतिक्रिया लौटाता एक "HighWaterMark" जो नवीनतम विभाजन वर्तमान में उपयोग किया जा रहा की लॉग में ऑफसेट प्रतिनिधित्व करता है। तो सिद्धांत रूप में आप जल्द से जल्द संदेश या वास्तव में किसी दिए गए विषय के लिए (यह मानते हुए कोई है) किसी भी एक सन्देश, और प्रतिक्रिया से HighWaterMark खींच सकता है। वहाँ और अधिक विस्तार HighWaterMark यहाँ पर है: बनाने के लिए अपने ग्राहक पर निर्भर https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

बेशक

, प्रतिक्रिया से HighWaterMarkOffset खींचने के लिए सक्षम किया जा रहा है कि अपने स्वयं के काफ्का एपीआई के माध्यम से उपलब्ध आंकड़ों।

+0

यह एक विशेष विभाजन के लिए उच्च पानी के निशान होगा। मुझे लगता है कि वह "नवीनतम संदेश" {partitionId, offsetId} जानकारी के बारे में पूछ रहा है। – arviman

+1

मुझे लगता है कि वैश्विक "नवीनतम संदेश" जैसी कोई चीज़ नहीं है। अगर कुछ ग्लोबल सिंक्रनाइज़ेशन तंत्र था तो कफका स्केल नहीं करेगा ... –

संबंधित मुद्दे