2016-02-26 30 views
7

के बाद कई उपभोक्ताओं (काफ्का 0.9 जावा एपीआई का उपयोग कर) बना सकते हैं और प्रत्येक थ्रेड शुरू कर दिया, मैं निम्नलिखित अपवादकाफ्का CommitFailedException उपभोक्ता अपवाद

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance 
class com.messagehub.consumer.Consumer is shutting down. 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546) 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487) 
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681) 
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654) 
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350) 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157) 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352) 
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936) 
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905) 

और उसके बाद सामान्य रूप से लेने वाली संदेश शुरू हो रही है, मुझे पता है कि क्या करना चाहते हैं इसे ठीक करने के लिए इस अपवाद का कारण बन रहा है।

+0

ह्यूगो, क्या आप अभी भी इस समस्या का सामना कर रहे हैं? क्या आप कुछ और जानकारी प्रदान कर सकते हैं? – Nautilus

+0

हां @ नॉटिलस, मुझे अभी भी यह समस्या है। मेरे पास 3 उपभोक्ता हैं, सभी एक ही उपभोक्ता समूह में, मेरे पास 20 विभाजन वाले विषय हैं, जहां से डेटा पढ़ा जाना चाहिए। यह अपवाद यादृच्छिक रूप से होता है, फिर भी उपभोक्ता विषय/विभाजन से डेटा पढ़ सकते हैं, हालांकि यह अपवाद ट्रिगर किया गया है। –

+0

उपभोक्ता केवल डेटा उपभोग कर रहे हैं या वे इसे संसाधित कर रहे हैं?मैं आपके स्टैकट्रैस पर देखता हूं कि अपवाद तब होता है जब आप ऑफसेट को सिंक करने की कोशिश कर रहे हैं, क्या आप वर्णन कर सकते हैं कि संदेशों की खपत और ऑफसेट की प्रतिबद्धता के बीच क्या होता है? मुझे लगता है कि यह संभव है कि आपका उपभोक्ता समन्वयक के साथ दिल की धड़कन खो रहा है। – Nautilus

उत्तर

-1

त्रुटियों के रूप में इसके उपभोक्ता समूह पुन: संतुलित मुद्दे। क्या आप हमें बता सकते हैं, कितने विभाजन विषय बनाया गया था? और कितने उपभोक्ता चल रहे हैं? क्या वे एक ही समूह के हैं?

+0

मेरे पास उस सदस्य के अधिक विभाजन हैं, इसलिए सदस्य को एकाधिक विभाजन से संदेश प्राप्त नहीं करना चाहिए? मेरे पास 20 विभाजन हैं, और 3 सदस्य चल रहे हैं, वे सभी एक ही उपभोक्ता समूह से संबंधित हैं। –

10

निम्नलिखित मानकों बदलाव करने भी प्रयास करें:

  • heartbeat.interval.ms - यह बताता है काफ्का मिलीसेकेंड की निर्धारित राशि इंतजार इससे पहले कि यह विचार करना उपभोक्ता "मृत" पर विचार किया जाएगा
  • max.partition.fetch.bytes - यह मतदान के दौरान प्राप्त होने वाले संदेशों की मात्रा (ऊपर) तक सीमित होगा।

मैंने देखा है कि पुनर्संतुलन होती है तो उपभोक्ता दिल की धड़कन का समय समाप्त होने से पहले काफ्का के लिए प्रतिबद्ध नहीं है। यदि संदेश संसाधित होने के बाद प्रतिबद्धता होती है, तो उन्हें संसाधित करने के लिए समय की मात्रा इन मानकों को निर्धारित करेगी। इसलिए, संदेशों की संख्या में कमी और दिल की धड़कन के समय में वृद्धि से पुनर्वसन से बचने में मदद मिलेगी।

और अधिक विभाजन का उपयोग करने पर भी विचार करें, इसलिए प्रति सर्वेक्षण कम संदेश के साथ भी आपके डेटा को संसाधित करने वाले अधिक थ्रेड होंगे।

मैंने परीक्षण करने के लिए इस छोटे से आवेदन को लिखा था। आशा करता हूँ की ये काम करेगा।

https://github.com/ajkret/kafka-sample

अद्यतन

काफ्का अब 0.10.x संदेशों की संख्या प्राप्त नियंत्रित करने के लिए एक नया पैरामीटर प्रदान करता है: - max.poll.records - रिकॉर्ड की अधिकतम संख्या में लौटे मतदान के लिए एक भी कॉल()।

अद्यतन

काफ्का ठहराव कतार के लिए एक रास्ता प्रदान करता है। जबकि कतार रुक गई है, आप संदेशों को एक अलग थ्रेड में संसाधित कर सकते हैं, जिससे आप दिल की धड़कन भेजने के लिए KafkaConsumer.poll() पर कॉल कर सकते हैं। फिर प्रसंस्करण के बाद KafkaConsumer.resume() पर कॉल करें। इस तरह आप दिल की धड़कन न भेजने के कारण की समस्याओं को कम कर देते हैं विद्रोह। यहां आप क्या कर सकते हैं इसकी एक रूपरेखा है:

while(true) { 
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE); 
    consumer.commitSync(); 

    consumer.pause(); 
    for(ConsumerRecord record: records) { 

     Future<Boolean> future = workers.submit(() -> { 
      // Process 
      return true; 
     }); 


     while (true) { 
      try { 
       if (future.get(1, TimeUnit.SECONDS) != null) { 
        break; 
       } 
      } catch (java.util.concurrent.TimeoutException e) { 
       getConsumer().poll(0); 
      } 
     } 
    } 

    consumer.resume(); 
} 
+0

संस्करण 0.10.x में अब एक नया पैरामीटर है, * max.poll.records *, max.partition.fetch.bytes के बजाय उपयोग करने के लिए। – ajkret

+0

मैंने आपके दृष्टिकोण को रोकें और फिर से शुरू करें, लेकिन फिर भी मुझे एक ही त्रुटि मिलती है। एकमात्र अंतर यह है कि मैं रोकथाम() के बाद commit सिंक() को कॉल कर रहा हूं और फिर से शुरू करने से पहले(), क्योंकि रिकॉर्ड केवल संसाधित होने पर मुझे केवल प्रतिबद्ध करने की आवश्यकता है। कोई आइडिया है कि मैं क्या गलत कर रहा हूँ? –

+0

@ mav3n: मैं एक ही मुद्दे को पूरा कर रहा हूं। Session.timeout.ms और max.poll.records को बढ़ाने का प्रयास किया लेकिन बिना किसी सफलता के। क्या आपको यह करने का तरीका मिला है? –

संबंधित मुद्दे