2016-01-14 13 views
20

अपाचे काफ्का जावा क्लाइंट (0.9) का उपयोग करके, मैं Kafka Producer class का उपयोग करके ब्रोकर को रिकॉर्ड की एक लंबी श्रृंखला भेजने की कोशिश कर रहा हूं।अपाचे काफ्का क्लाइंट "बैच समाप्त हो गया" अपवाद कब फेंकता है?

असीमित send method थोड़ी देर के लिए तुरंत लौटता है, फिर प्रत्येक कॉल पर थोड़े समय के लिए अवरुद्ध करना शुरू कर देता है। लगभग तीस सेकंड के बाद, क्लाइंट अपवाद फेंकना शुरू करता है (TimeoutException), संदेश "बैच समाप्त हो गया" के साथ।

इस अपवाद को किस परिस्थिति को फेंकने का कारण बनता है?

उत्तर

30

यह अपवाद इंगित करता है कि आप भेजे जाने से तेज़ दर पर रिकॉर्ड कतारबद्ध कर रहे हैं।

जब आप send विधि को कॉल करते हैं, तो ProducerRecord ब्रोकर को भेजने के लिए एक आंतरिक बफर में संग्रहीत किया जाएगा। एक बार ProducerRecord को बार-बार वापस लौटा दिया गया है, भले ही इसे भेजा गया हो।

रिकॉर्ड्स को प्रति संदेश परिवहन ओवरहेर्ड को कम करने और थ्रूपुट बढ़ाने के लिए ब्रोकर को भेजने के लिए बैचों में समूहित किया गया है।

एक बार रिकॉर्ड बैच जोड़ा जाता है, तो उस बैच को भेजने के लिए एक समय सीमा होती है ताकि यह सुनिश्चित किया जा सके कि इसे निर्दिष्ट अवधि के भीतर भेजा गया है। यह निर्माता कॉन्फ़िगरेशन पैरामीटर, request.timeout.ms द्वारा नियंत्रित होता है, जो तीस सेकंड तक डिफ़ॉल्ट होता है।

यदि बैच टाइमआउट सीमा से अधिक समय तक कतारबद्ध किया गया है, तो अपवाद फेंक दिया जाएगा। उस बैच में रिकॉर्ड्स प्रेषण कतार से हटा दिए जाएंगे।

कॉन्फ़िगरेशन पैरामीटर का उपयोग करते हुए टाइमआउट सीमा को बढ़ाकर, क्लाइंट को समाप्त होने से पहले लंबे समय तक बैचों को कतारबद्ध करने की अनुमति होगी।

+0

में वृद्धि करना पड़ा, मैंने नीचे आपकी टिप्पणी का उत्तर दिया है, अगर आपको कोई सुझाव है तो मुझे बताएं। –

+0

मुझे आश्चर्य है कि 'batch.size' को 0 (या 1 और मानक मान के बीच का मान) सेट करना वास्तव में समस्या को बेहतर ढंग से हल करेगा? –

+0

हाय @ जेम्स थॉमस, "इंगित करता है कि आप रिकॉर्ड किए जाने से तेज़ दर पर रिकॉर्ड कतारबद्ध कर रहे हैं", अगर मैं कतारबद्ध नहीं करना चाहता हूं तो क्या होगा? हमारे उत्पादन वातावरण में बहुत से ट्रैफिक होंगे, हम जितनी जल्दी हो सके डेटा भेजना चाहते हैं। हम इसे समाप्त नहीं करना चाहते हैं। हमने linger.ms को डिफॉल्ट के रूप में सेट किया है, फिर भी यह समस्या प्राप्त हो रही है। आपने कहा कि बढ़ते अनुरोध .timeout.ms बैचिंग अवधि बढ़ाएंगे। –

3

दलाल को भेजने से पहले समय को नियंत्रित करने वाला पैरामीटर linger.ms है। इसका डिफ़ॉल्ट मान 0 है (कोई देरी नहीं)।

+0

यह मूल प्रश्न से स्पष्ट नहीं हो सकता है कि मेरे साथ क्या हो रहा था उदाहरण के लिए, मैंने इसे स्पष्ट करने के लिए अब और विवरण देने की कोशिश की है। पूरी जानकारी के लिए नीचे टिप्पणी देखें। यह एक मुद्दा था कि मैं अपने अपलोड बैंडविड्थ की तुलना में रिकॉर्ड क्विकिंग कर रहा था। –

22

मुझे यह अपवाद पूरी तरह से अलग संदर्भ में मिला।

मैंने एक ज़ूकीपर वीएम, ब्रोकर वीएम और निर्माता/उपभोक्ता वीएम का मिनी क्लस्टर सेट किया है। मैंने सर्वर पर सभी निरंतर बंदरगाहों को खोला (9 0 9 2) और ज़ूकीपर (2181) पर और फिर उपभोक्ता/प्रकाशक वीएम से ब्रोकर को एक संदेश प्रकाशित करने का प्रयास किया। मुझे ओपी द्वारा उल्लिखित अपवाद मिला, लेकिन चूंकि मैंने अभी तक केवल एक ही संदेश प्रकाशित किया है (या कम से कम मैंने कोशिश की है), समाधान टाइमआउट या बैच आकार को बढ़ाने के लिए नहीं हो सकता है। इसलिए मैंने इस मेलिंग सूची को खोजा और पाया कि मेरे पास उपभोक्ता/निर्माता वीएम (क्लोज़ेड चैनल एक्सेप्शन) के भीतर संदेशों का उपभोग करने की कोशिश करते समय एक ही समस्या का वर्णन किया गया था: http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config इस मेलिंग सूची में अंतिम पोस्ट वास्तव में समस्या को हल करने का वर्णन करता है।

लंबी कहानी संक्षेप में, अगर आप का सामना दोनों ChannelClosedException और Batch Expired अपवाद, आप की संभावना server.config फ़ाइल में निम्न के लिए इस लाइन को बदलने और दलाल पुनः आरंभ करना:

advertised.host.name=<broker public IP address> 

यदि ऐसा नहीं है सेट करें, यह host.name प्रॉपर्टी (जो शायद न तो सेट नहीं है) पर वापस आ जाता है और फिर InetAddress जावा क्लास के कैनोलिक होस्ट नाम पर वापस आ जाता है, जो अंततः पाठ्यक्रम के सही नहीं है और इस प्रकार रिमोट नोड्स को भ्रमित कर रहा है।

+4

मैं इस सज्जन के उत्तर की पुष्टि कर सकता हूं – x4k3p

+0

श्रोताओं = PLAINTEXT: // domain_name: 9092 पोर्ट = 9092 host.name = localhost advertised.host.name = domain_name.i स्थानीय रूप से संदेश भेज सकता है लेकिन जब मैंने कफका सर्वर लाइव किया! मुझे यह अपवाद मिल रहा है "org.apache.kafka.common.errors.TimeoutException: बैच की समय सीमा समाप्त हो गई java.util.concurrent.ExecutionException"। मैं गलत कहां जा रहा हूं –

+0

अगर कोई और देख रहा है तो कॉन्फ़िगरेशन फ़ाइल '0.12.2.1' के लिए' /config/server.properties' के अंतर्गत है। – Edd

-1

जब आप उपभोक्ता सेट ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG को सत्य बनाते हैं।

1

मैं काफ्का जावा क्लाइंट संस्करण 0.11.0.0 का उपयोग कर रहा हूं। मैंने लगातार बड़े संदेश बनाने में विफलता में एक ही पैटर्न को देखना शुरू कर दिया। यह कुछ संदेशों के लिए गुजर रहा था, और कुछ अन्य लोगों के लिए असफल रहा। (हालांकि दोनों पारित और असफल संदेश एक ही आकार के थे)। मेरे मामले में, प्रत्येक संदेश का आकार लगभग 60 केबी था, जो 16kB के काफ्का के डिफ़ॉल्ट batch.size से कहीं अधिक है, मेरे linger.ms को 0 के डिफ़ॉल्ट पर सेट किया गया था। यह त्रुटि हो रही है उत्पादक क्लाइंट सर्वर से एक सफल प्रतिक्रिया प्राप्त करने से पहले समय निकाल रहा है। मूल रूप से, मेरे कोड में, यह कॉल समय समाप्त हो रहा था: kafkaProd.send(pr).get()। इसे ठीक करने के लिए, मुझे निर्माता क्लाइंट के डिफ़ॉल्ट request.timeout.ms को 60000

संबंधित मुद्दे