2016-03-03 12 views
6

हाई लेवल उपभोक्ता एपीआई एक समय में एक संदेश पढ़ रहा प्रतीत होता है।क्या कफका में बैच उपभोक्ता है?

यह उपभोक्ताओं के लिए काफी समस्याग्रस्त हो सकता है अगर वे उन संदेशों को सोलर या लोचदार-खोज जैसे अन्य डाउनस्ट्रीम उपभोक्ताओं को संसाधित करना और सबमिट करना चाहते हैं क्योंकि वे एक समय में एक के बजाय थोक में संदेश रखना पसंद करते हैं।

उन संदेशों को स्मृति में भी बैच करना मुश्किल नहीं है क्योंकि कफका में ऑफसेट्स को केवल तभी समन्वयित करने की आवश्यकता होगी जब बैच पहले से ही किया गया हो, अन्यथा बिना किसी डाउनस्ट्रीम संदेशों (जैसे सोलर या ईएस) के साथ क्रैश कफका-उपभोक्ता इसके ऑफ़सेट अपडेट पहले से ही और इसलिए ढीले संदेश होंगे।

उपभोक्ता एक बार से अधिक संदेशों का उपभोग कर सकता है अगर यह नीचे की ओर गए संदेशों करने के बाद, लेकिन संदेश ऑफसेट अद्यतन करने से पहले दुर्घटनाओं।

काफ्का बैच में संदेशों की खपत है, तो कोड/प्रलेखन के लिए कुछ संकेत दिए गए बहुत सराहना की जाएगी।

धन्यवाद!

+0

काफ्का के किन संस्करणों रहे हैं तुम पूछ रहे हो मुझे लगता है कि अगर आप हाई लेवल कंज्यूमर के बारे में बात कर रहे हैं तो यह 0.8.2 या इससे पहले है। – morganw09dev

उत्तर

3

मुझे बैच उपभोक्ता के बारे में पता नहीं है। लेकिन यहां तक ​​कि अगर आपकी कोई मुख्य समस्या बनी रहती है। डेटा सफलतापूर्वक अग्रेषित करने के बाद आप ऑफसेट करना चाहते हैं। इसे प्राप्त करने का एक तरीका यह है कि संपत्ति auto.commit.enable = false सेट करके उपभोक्ता की ऑटो प्रतिबद्धता को बंद करना है। ट्रेडऑफ निश्चित रूप से है कि आपको अपने ऑफसेट को कब प्रतिबद्ध करना है, इस पर ध्यान रखना होगा।

उपभोक्ता संपत्तियों यहाँ की एक पूरी प्रलेखन खोजें: https://kafka.apache.org/documentation.html#consumerconfigs

कैसे manualy को जावा-डॉक (https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) से चोरी ऑफसेट करने से पर एक अच्छा उदाहरण:

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "false"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("session.timeout.ms", "30000"); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("foo", "bar")); 
final int minBatchSize = 200; 
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     buffer.add(record); 
    } 
    if (buffer.size() >= minBatchSize) { 
     insertIntoDb(buffer); 
     consumer.commitSync(); 
     buffer.clear(); 
    } 
} 
+0

मैं ऑटो प्रतिबद्धता के आपके स्पष्टीकरण से सहमत हूं। लेकिन जहां तक ​​आपका कोड जाता है, उपभोक्ता रिकार्ड एक काफ्का 0.9 वर्ग है, जबकि उसका सवाल ऐसा लगता है कि वह पूर्व 0.9 उपभोक्ताओं के बारे में पूछ रहा है। हालांकि वह स्पष्ट रूप से यह नहीं बताता है। – morganw09dev

+0

उपरोक्त कोड के साथ समस्या है। – user2250246

+1

यदि ऑफसेट करने से पहले उपभोक्ता क्रैश हो जाता है, तो संदेशों को फिर से चलाया जाएगा। मेरे पास प्रारंभिक लेनदेन() और अंत ट्रांज़ेक्शन() के डीबी समतुल्य नहीं है। – user2250246

संबंधित मुद्दे