मैं एक आवेदन है कि सदस्यता काफ्का से विषय जोड़ और गतिशील रूप से हटाया जा करने की अनुमति देता का निर्माण कर रहा हूँ। जब एक विषय सदस्यता जोड़ा जाता है मैं एक बैच कार्य हर घंटे कि नए संदेशों के सब हो जाता है और उन्हें किसी अन्य डेटासंग्रह में धक्का चलाने के लिए चाहता था।काफ्का - सबसे आसान तरीका नवीनतम ऑफसेट हो जाओ करने के लिए
जो मैं समझना चाहता हूं वह है कि किसी विषय की वर्तमान ऑफसेट कैसे प्राप्त करें। जैसे ही सब्सक्रिप्शन जोड़ा जाता है, मैं चाहता हूं कि अगले बैच नौकरी सदस्यता के अनुमानित समय से सभी संदेश प्राप्त करें।
उदाहरण के तौर पर, कल्पना करें कि मेरे पास "टॉपिका" नामक एक विषय है जो लगातार संदेश प्राप्त कर रहा है। यदि मैं 7.15 बजे एक सदस्यता जोड़ता हूं, जब बैच नौकरी 8 बजे चलती है तो मैं 7.15 बजे से सभी संदेशों को बैच अप लेना चाहता हूं। मैं अनुमान के समय के लिए खुश हूं - 7.10, 7.20 इत्यादि। 5 या 10 मिनट दोनों तरफ मुझे कोई चिंता नहीं होती है।
तो मेरा इरादा समाधान एक सब्सक्रिप्शन के पल में किसी विषय की वर्तमान ऑफसेट प्राप्त करना है। मैंने साधारण उपभोक्ता को देखा है, लेकिन मैं इस मूल उपयोग मामले के लिए क्लस्टर प्रबंधन के सभी पहलुओं में शामिल नहीं होना चाहता हूं।
मैं भी उच्च स्तरीय उपभोक्ता देखा है। मैं ऐसा कुछ कर सकता था:
consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset
इस दृष्टिकोण के साथ मुझे क्या चिंता है कि "सिर" का आह्वान वास्तव में एक धारा है। तो मेरा मानना है कि यह अगले संदेश की प्रतीक्षा कर देगा। अवरुद्ध करना समस्याग्रस्त है क्योंकि यह अगले संदेश आने तक अन्य सब्सक्रिप्शन को कतारबद्ध कर सकता है।
मैं बाद दृष्टिकोण को लागू करने के लिए कुछ समय बिताने के लिए खुश हूँ, लेकिन अगर वहाँ एक आसान तरीका है कि मुझे आवश्यकता नहीं है त्रुटि प्रवण समवर्ती कोड लिखने के लिए है, तो मैं नहीं बल्कि मेरा समय बर्बाद मत चाहते हैं।
मुझे ऑफसेट के बाद से सभी लॉग प्राप्त करने का भी एक तरीका चाहिए।
यह एक विशेष विभाजन के लिए उच्च पानी के निशान होगा। मुझे लगता है कि वह "नवीनतम संदेश" {partitionId, offsetId} जानकारी के बारे में पूछ रहा है। – arviman
मुझे लगता है कि वैश्विक "नवीनतम संदेश" जैसी कोई चीज़ नहीं है। अगर कुछ ग्लोबल सिंक्रनाइज़ेशन तंत्र था तो कफका स्केल नहीं करेगा ... –