2013-02-07 26 views
7

मैंने कफका वेबसाइट पर प्रलेखन पढ़ा लेकिन पूर्ण न्यूनतम उदाहरण (निर्माता -> कफका -> उपभोक्ता) को लागू करने की कोशिश करने के बाद यह मुझे स्पष्ट नहीं है कि "उपभोक्ता राज्य", ऑफसेट को संभालने की जरूरत है।अपाचे काफ्का: उपभोक्ता राज्य

कुछ जानकारी

  1. मैं highlevel एपीआई (जावा)
  2. मेरे उपभोक्ता एक मुख्य, मूल रूप से एक ही "त्वरित प्रारंभ" काफ्का पेज
  3. पर पाया जा सकता है कि के साथ एक सरल वर्ग है उपयोग कर रहा हूँ
  4. मैं Zookeeper
  5. उपयोग कर रहा हूँ मैं अब एक दलाल

उपयोग कर रहा हूँ, प्रलेखन का कहना है कि highlevel एपीआई उपभोक्ता stor es अपने राज्य चिड़ियाघर संचालक का उपयोग कर तो मैं ऑफसेट और इसलिए उपभोक्ता के राज्य

  • के बीच
      काफ्का दलाल पुन: प्रारंभ होता बनाए रखा जाना होगा उम्मीद करेंगे
    • उपभोक्ता पुन: प्रारंभ होता

    लेकिन दुर्भाग्य से यह नहीं है: प्रत्येक समय मैं ब्रोकर या उपभोक्ता को पुनरारंभ करता हूं, सभी संदेश फिर से वितरित किए जाते हैं। अब, शायद इन बेवकूफ सवाल कर रहे हैं लेकिन

    1. काफ्का के मामले में पुन: प्रारंभ: मैं समझ गया कि उपभोक्ता पर निर्भर है अपने राज्य तो शायद रखने के लिए जब दलाल (री) शुरू होता है फिर से डिलीवर सभी (!) संदेश और उपभोक्ता निर्णय लेता है कि क्या उपभोग करना है ... क्या यह सही है? यदि हां, तो क्या होगा यदि मेरे पास 10.0000.0000 संदेश हैं?

    2. जेवीएम उपभोक्ता के मामले में पुन: प्रारंभ करें: यदि राज्य को ज़ूकीपर पर रखा जाता है तो संदेश फिर से वितरित क्यों होते हैं? क्या यह संभव है कि नए जेवीएम में एक अलग उपभोक्ता "पहचान" है? और इस मामले में, मैं पिछली पहचान कैसे बांध सकता हूं?

  • उत्तर

    2

    ऐसा लगता है कि मैं एक खराब पाठक रहा हूं ... यह सब कॉन्फ़िगरेशन पृष्ठ में है। विशेष रूप से मेरे दोनों प्रश्नों को ध्वज "autooffset.reset" सेट करके हल किया गया था जो "सबसे छोटे" पर डिफ़ॉल्ट होता है और इसलिए वर्णित प्रभावों का कारण बनता है।

    अब, "सबसे बड़ा" मूल्य के रूप में, उपभोक्ता और दलाल के पुनरारंभ के मामले में चीजें अपेक्षाकृत काम कर रही हैं, क्योंकि ऑफसेट हमेशा सबसे बड़ा होता है।

    4

    हां, उपभोक्ता अपने राज्य को बनाए रखने के लिए ज़िम्मेदार है, और जावा उच्च स्तरीय उपभोक्ता अपने राज्य को ज़ूकीपर में बचाता है।

    अधिकतर आपने groupId कॉन्फ़िगरेशन प्रॉपर्टी निर्दिष्ट नहीं की है। उस स्थिति में कफका यादृच्छिक groupId उत्पन्न करता है।

    यह भी संभव है कि आपने autocommit.enable कॉन्फ़िगरेशन प्रॉपर्टी बंद कर दी हो।

    काफ़का कॉन्फ़िगरेशन का पूरा संदर्भ इस पृष्ठ पर पाया जा सकता है: http://kafka.apache.org/configuration.html"उच्च स्तरीय उपभोक्ता के लिए महत्वपूर्ण कॉन्फ़िगरेशन गुण" शीर्षक।

    4

    मूल सवाल का जवाब देने: का उपयोग कर ग्रुप में मदद करता है "प्रारंभ से से सभी संदेशों को फिर से लेने वाली" से बचने स्थिति

    अगर आप ग्रुप आप सभी संदेशों को पल से कतार में बनाया गया था मिल जाएगा बदल (या कफका लॉग प्रतिधारण नीति के आधार पर अंतिम डेटा शुद्ध)

    इसे कफका-कंसोल-उपभोक्ता "- से शुरूआत" ध्वज (जो auto.offset.reset विकल्प सेट करता है) के साथ भ्रमित न करें नीचे विकल्प 1 और 2 के बीच चयन करने के लिए:

    1) अंतिम संदेश खपत के पल से नए संदेश उपभोग करें (एन ओटी समय की शुरुआत से जब कफका कतार मूल रूप से बनाई गई थी):

    props.put ("auto.offset.reset", "सबसे छोटा");

    2) पल ग्राहक JVM शुरू कर दिया है से नए संदेशों का उपभोग (इस मामले में आप लापता कतार पर डाल दिया, जबकि ग्राहक नीचे और नहीं था संदेशों कतार को सुनने) का जोखिम:

    props.put ("auto.offset.reset", "सबसे बड़ी");


    ओर ध्यान दें: नीचे केवल ऊपरी तौर पर मूल प्रश्न

    एक और अधिक उन्नत उपयोग स्थिति के लिए

    से संबंधित है - कुछ समय से शुरू संदेशों को पुनः चलाने आप प्रोग्राम के उपभोक्ता सेट ऑफसेट करने के लिए कोशिश कर रहे हैं - दाएं ब्रोकर/विभाजन से पुनः चलाने के लिए सबसे छोटा ऑफसेट खोजने के लिए https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example में दिखाए गए अनुसार SimpleConsumer API का उपयोग करने की आवश्यकता होगी। जो अनिवार्य रूप से ज़ूकीपर को अपने स्वयं के FindLeader तर्क से बदल रहा है। बहुत मुश्किल

    इस उपयोग के मामले के लिए (कुछ उपयोगकर्ता द्वारा निर्दिष्ट समय से शुरू होने वाले संदेशों का विज्ञापन-प्रसार) हमने निर्णय लिया है कि संदेशों के स्थानीय कैश को स्टोर करने और काफ़का ऑफ़सेट प्रबंधन एपीआई (जो कि एक अच्छा पुन: कार्यान्वित करने की आवश्यकता होगी) के बजाय स्थानीय रूप से ऑफ़सेट प्रबंधित करने का निर्णय लेता है। SimpleConsumer के साथ zookeeper कार्यक्षमता का हिस्सा)।

    आईई। एक बार पोस्टमैन के रूप में काफ्का का इलाज करें, एक बार संदेश वितरित होने पर यह स्थानीय मेलबॉक्स पर जाता है और यदि हमें अतीत में किसी निश्चित ऑफसेट पर वापस जाने की आवश्यकता होती है और कहें, तो संदेशों को दोबारा चलाएं (जो पहले ही खा चुके हैं) उदा। उपभोक्ता ऐप त्रुटि के मामले में, हम सही वितरण आदेश को समझने के लिए "पोस्ट ऑफिस" (काफ्का दलाल) पर वापस नहीं जाते हैं, लेकिन इसे स्थानीय रूप से प्रबंधित करें।

    पक्ष नोट के अंत

    +0

    आप कैसे आप काफ्का से ऑफसेट स्थानीय स्तर के बजाय प्रबंधन पर विस्तार से बता सकते हैं? जैसे कि आप भेजे गए प्रत्येक संदेश के लिए ऑफ़सेट निर्धारित करते हैं और गणना करते हैं, तो उपभोग किया जाए। – David

    +0

    एक बार खपत - संदेश आईडी के रूप में वर्तमान टाइमस्टैंप जोड़ें, और बास्करी ब्लॉब के रूप में स्टोर संदेश (इसे एवरो प्रारूप में भेजा गया है और हम इस बिंदु पर इसे deserialize नहीं करते हैं) hsql (डिस्क पर दृढ़ता के साथ) में, या आप apache फीनिक्स और संग्रह का उपयोग कर सकते हैं यह दो कॉलम आईडी (टाइमस्टैम्प) के साथ बाइनरी प्रारूप में है, संदेश (VARBINARY) – alex

    +0

    लेकिन यह संदेश ऑफ़सेट से कैसे संबंधित है? काफ्का ऑफ़सेट मान संदेश का टाइमस्टैम्प या बाइनरी एन्कोडिंग नहीं है या हैश है या नहीं? मैं अभी भी काफ्का के लिए नया हूं, इसलिए मेरी अज्ञानता को क्षमा करें। – David

    संबंधित मुद्दे