5

मैं काफ्का 0.9 में नया हूं और जावा में एक अजीब व्यवहार को महसूस किया है जिसे मैंने उपभोक्ता (KafkaConsumer) लागू किया है।काफ्का उपभोक्ता के मतदान() विधि को अवरुद्ध कर दिया गया है

काफ्का ब्रोकर अंबाड़ी बाहरी मशीन में स्थित है।

यहां तक ​​कि आप एक निर्माता को लागू कर सकते हैं और बाहरी ब्रोकर को संदेश भेजना शुरू कर सकते हैं, मुझे कोई संकेत नहीं है कि जब उपभोक्ता घटनाओं (चुनाव) को पढ़ने की कोशिश करता है, तो यह अटक जाता है।

मुझे पता है कि निर्माता अच्छी तरह से काम कर रहा है, क्योंकि मैं कंसोल उपभोक्ता (जो स्थानीय स्तर पर स्थानीय रूप से काम कर रहा है) के माध्यम से संदेश का उपभोग कर सकता है। लेकिन जब मैं जावा उपभोक्ता निष्पादित करता हूं, कुछ भी नहीं होता है, बस अटक जाता है। कोड सकता डिबगिंग मैं देख रहा हूँ कि यह poll() लाइन पर अवरुद्ध हो जाता है:

ConsumerRecords<String, String> records = consumer.poll(100); 

टाइमआउट कुछ नहीं करता है, वैसे। इससे कोई फर्क नहीं पड़ता कि आप 0, 100 या 1000 एमएस डालते हैं, उपभोक्ता इस लाइन में अवरुद्ध हो जाता है और समय-समय पर अपवाद नहीं करता है।

मैं वैकल्पिक संपत्तियों की सभी प्रकार, इस तरह के रूप advertised.host.name, advertised.listener की कोशिश की, ... और इतने पर, शून्य भाग्य के साथ।

किसी भी मदद की अत्यधिक सराहना की जाएगी। अग्रिम में धन्यवाद!

+0

क्या आप संदेशों को 'kafka-console-consumer.sh' का उपयोग करके अलग-अलग तरीके से उपभोग करने में सक्षम हैं? –

+0

हाँ, मैं हूं। मशीन से जो अंबारी होस्ट करती है, मैं कंसोल उपभोक्ता –

+0

के माध्यम से संदेश का उपभोग कर सकता हूं और जिस मशीन पर आप अपना उपभोक्ता चलाते हैं उसके बारे में क्या? क्या आपने वहां कंसोल उपभोक्ता को आजमाया था? –

उत्तर

1

कारण वह मशीन हो सकती है जहां आपका उपभोक्ता कोड चल रहा है, वह ज़ूकीपर से कनेक्ट करने में असमर्थ है। मशीन पर वही उपभोक्ता कोड चलाने का प्रयास करें जहां आपका कफका स्थापित है (मैंने कोशिश की और मेरे लिए काम किया)। मैंने server.properties फ़ाइल में निम्न गुणों का उल्लेख करके समस्या हल की: advertised.host.name="ip address which you want to expose" // मेरे मामले में यह ec2 मशीन का सार्वजनिक आईपी है, मेरे पास एक ही ec2 पर kafka और zookeeper स्थापित है। advertised.port=9092 ConsumerRecords<String, String> records = consumer.poll(100); उपरोक्त कथन का मतलब यह नहीं है कि उपभोक्ता 100 एमएस के बाद समय निकाल देगा, यह मतदान अवधि है। 100 एमएस में जो भी डेटा कैप्चर करता है उसे रिकॉर्ड संग्रह में पढ़ा जाता है।

संबंधित मुद्दे