2016-03-09 20 views
6

हम .9 के लिए हमारी काफ्का कार्यान्वयन के उन्नयन और consumer.I उपभोक्ता के लिए नीचे दिए गए कोड का उपयोग कर रहा बनाने के लिए नए उपभोक्ता जावा एपीआई का उपयोग कर और हम लाइन एक के रूप में उपभोक्ता के लिए विषय की स्थापना का उपयोग कर रहे रहे हैं और लाइन बी हमारी सेवा का आह्वान है जो हमें प्राप्त संदेशों को संसाधित करता है। अब समस्या यह है कि अगर हमारे संदेश प्रसंस्करण में 30 सेकंड लगते हैं तो हमें अपवाद मिल रहा है। 47::काफ्का .9 में नवीनीकृत नए उपभोक्ता एपीआई के साथ

Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("group.id", "test-group"); 
      props.put("auto.offset.reset", "earliest"); 
      props.put("heartbeat.interval.ms", "1000"); 
      props.put("receive.buffer.bytes", 10485760); 
      props.put("fetch.message.max.bytes", 5242880); 
      props.put("enable.auto.commit", false); 
    //with partition assigned to consumer 


      KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props); 
      // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0); 
      //consumer.assign(Arrays.asList(partition0)); 
      //assign topic to consumer without partition 
//LINE A 
      consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp()); 
      List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
      while (true) { 
       try { 
        ConsumerRecords<Object, Object> records = consumer.poll(1000); 
        consumeFromQueue(records);//LINE B 
        consumer.commitSync(); 

       } catch (CommitFailedException e) { 
        e.printStackTrace(); 
        System.out.println("CommitFailedException"); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        System.out.println("Exception in while consuming messages"); 
       } 

अपवाद

2016-03-03 10 है 35.095 जानकारी 6448 --- [पूछना-अनुसूचक-3] o.a.k.c.c.internals.AbstractCoordinator: समन्वयक अंकन 2147483647 मर चुका है। 2016-03-03 10: 47: 35.096 ERROR 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator: त्रुटि ILLEGAL_GENERATION समूह टेस्ट-ग्रुप CommitFailedException org.apache.kafka.clients के लिए ऑफसेट करने के दौरान त्रुटि हुई। उपभोक्ता.कॉमिटफेल अपवाद: समूह रीबैलेंस पर org.apache.kafka.clients.consumer.internals पर कॉमिट पूरा नहीं किया जा सकता है। कॉन्स्यूमरकॉर्डिनेटर $ ऑफसेट कॉमिट्रॉस्सेन्डरहैंडलरहैंडल (ConsumerCoordinator.java:5252) org.apache.kafka.clients.consumer पर। internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.java:493) org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:665) org.apache.kafka.clients पर पर। consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:644) org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess (RequestFuture.java:167) org.apache.kafka.clients.consumer.internals पर। RequestFuture.fireSuccess (RequestFuture.java:133) org.apache.kafka.clients.consumer.internals.RequestFuture.complete (RequestFuture.java:107) org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ पर RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.java:380) org.apache.kafka.clients.NetworkClient.poll (NetworkClient.javas74) org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll (ConsumerNetworkClient पर। जावा: 320) org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetwo rkClient.java2213)

ऑफसेट करने के दौरान ऊपर अपवाद आता है। कोई भी सुझाव आपको धन्यवाद देगा

उत्तर

6

ऐसा इसलिए होता है क्योंकि नया उपभोक्ता एकल थ्रेडेड है, और उपभोक्ता समूह के साथ दिल की धड़कन रखने का एकमात्र तरीका यह है कि समूह समन्वयक 30 सेकंड के बाद ऑफसेट को मतदान या कर रहा है आपके उपभोक्ता को मृत के रूप में चिह्नित कर रहा है और समूह के पुनर्वसन के लिए बुला रहा है। इस स्थिति के लिए आप या तो request.timeout.ms बढ़ा सकते हैं या 2 धागे के बीच उपभोग और प्रसंस्करण के काम को विभाजित कर सकते हैं।

+0

धन्यवाद द्वारा ग्राहक के लिए लौट आए, मैं 70000 करने के लिए "request.timeout.ms" जोड़ने की कोशिश की, यह मुझे दिया एक ही अपवाद हालांकि संदेश प्रसंस्करण 30000 लिया। –

+0

यह सुनिश्चित नहीं है कि यह कोई प्रभाव क्यों नहीं ले रहा है। मैंने "session.timeout.ms" को 70000 पर सेट करने का भी प्रयास किया लेकिन मुझे अपवाद दिया कि "org.apache.kafka.common.errors.ApiException: सत्र टाइमआउट एक स्वीकार्य सीमा के भीतर नहीं है।" मैं आपके दूसरे सुझाव के बारे में सोच रहा था, लेकिन अगर थ्रेड प्रसंस्करण में कुछ चीज गलत हो जाती है तो मैं इसे कैसे संभालेगा? नए संदेश के रूप में इसे फिर से जोड़ना और अपवाद से पहले संदेश द्वारा किए गए परिवर्तनों को वापस करना? –

+0

group.max.session.timeout.ms को बढ़ाएं – Nautilus

0

आप एक कुछ उपयुक्त सीमा आपके सबसे बड़ा संदेश से भी बड़ा है, लेकिन बहुत कम है कि आप प्रति पोल कम संदेशों मिल जाएगा है कि करने के लिए

max.partition.fetch.bytes 

की स्थापना करके चुनाव द्वारा दिया संदेश() की संख्या को सीमित कर सकता है।

काफ्का 0.10.x समर्थन हासिल है स्पष्ट रूप से सीमित करने के लिए संदेशों की संख्या उत्तर के लिए स्थापित करने

max.poll.records 
संबंधित मुद्दे