मैं काफ्का का अध्ययन करने वाला एक नया छात्र हूं और मैंने कई उपभोक्ताओं को समझने के साथ कुछ मौलिक मुद्दों में भाग लिया है कि लेख, दस्तावेज इत्यादि अब तक बहुत उपयोगी नहीं हैं।मैं काफ़का में कई उपभोक्ताओं का उपयोग कैसे करूं?
एक चीज जिसे मैंने करने की कोशिश की है, वह अपना उच्च स्तर काफ़का निर्माता और उपभोक्ता लिखता है और उन्हें एक साथ चलाता है, एक विषय पर 100 सरल संदेश प्रकाशित करता है और मेरा उपभोक्ता उन्हें पुनर्प्राप्त करता है। मैंने इसे सफलतापूर्वक करने में कामयाब रहा है, लेकिन जब मैं एक दूसरे उपभोक्ता को उसी विषय से उपभोग करने का प्रयास करता हूं जो संदेशों को अभी प्रकाशित किया गया था, तो उसे कोई संदेश नहीं मिला।
यह मेरी समझ थी कि प्रत्येक विषय के लिए, आप उपभोक्ता अलग उपभोक्ता समूहों से प्राप्त कर सकते हैं और इनमें से प्रत्येक उपभोक्ता समूह को किसी विषय पर उत्पादित संदेशों की पूरी प्रति प्राप्त होगी। क्या ये सही है? यदि नहीं, तो मेरे लिए एकाधिक उपभोक्ताओं को स्थापित करने का उचित तरीका क्या होगा?
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
इसके अलावा, मैंने देखा है कि मूल रूप से मैं केवल एक ही विभाजन के साथ एक विषय 'परीक्षण' के लिए ऊपर की खपत का परीक्षण किया गया था: यह उपभोक्ता वर्ग है कि मैं अब तक लिखा है है। जब मैंने एक मौजूदा उपभोक्ता समूह में एक और उपभोक्ता समूह 'टेस्ट ग्रुप' कहा, तो इसने एक कफका विद्रोह शुरू किया जिसने सेकंड की परिमाण में, मेरी खपत की विलम्ब को एक महत्वपूर्ण राशि से धीमा कर दिया। मैंने सोचा था कि यह पुनर्विक्रय के साथ एक मुद्दा था क्योंकि मेरे पास केवल एक विभाजन था, लेकिन जब मैंने 6 विभाजनों के साथ एक नया विषय 'एकाधिक विभाजन' बनाया, तो इसी तरह के मुद्दे सामने आए जहां एक ही उपभोक्ता समूह में अधिक उपभोक्ताओं को जोड़ने से विलंबता के कारण हुए। मैंने चारों ओर देखा है और लोग मुझे बता रहे हैं कि मुझे एक बहु थ्रेडेड उपभोक्ता का उपयोग करना चाहिए - क्या कोई उस पर प्रकाश डाल सकता है?
काफ़का '0.8.1' के लिए उच्च स्तर के उपभोक्ता [यहां] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) का एक शानदार उदाहरण है। – chrsblck
@chrsblck लिंक के लिए धन्यवाद।मैंने वास्तव में जांच की है कि पहले और शायद इसे समझ में नहीं आया था और साथ ही मैं कर सकता था - क्या आप शायद थोड़ा सा समझा सकते हैं कि यह उदाहरण धागे का उपयोग कैसे करता है? मैं पूरी तरह समझ नहीं पा रहा हूं कि वे इस समय क्या कर रहे हैं। –
एक तरीका है किसी दिए गए विषय के लिए विभाजन के समान धागे की संख्या। लेख से - धाराओं की सूची 'सूची> streams = उपभोक्ताMap.get (विषय);' ... फिर प्रत्येक थ्रेड को एक विभाजन 'execor.submit असाइन करें (नया उपभोक्ताटेस्ट (धारा, धागा संख्या)) '। –
chrsblck