2016-10-06 29 views
5

here प्रदान किए गए दस्तावेज के अनुसार, मैं same documentation में उल्लिखित श्रोता में संदेश प्राप्त करने के लिए एक पीओसी पर कोशिश कर रहा हूं, नीचे मैंने कॉन्फ़िगरेशन लिखा है।वसंत एकीकरण काफ्का उपभोक्ता श्रोता संदेश प्राप्त नहीं

@Configuration 
public class KafkaConsumerConfig { 

    public static final String TEST_TOPIC_ID = "record-stream"; 

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}") 
    private String topic; 

    @Value("${kafka.address:localhost:9092}") 
    private String brokerAddress; 


    /* 
     @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
     KafkaMessageListenerContainer<String, String> container) { 
     KafkaMessageDrivenChannelAdapter<String, String> 
     kafkaMessageDrivenChannelAdapter = new 
     KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); 
     kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return 
     kafkaMessageDrivenChannelAdapter; } 

     @Bean public QueueChannel received() { return new QueueChannel(); } 
    */ 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(3); 
     factory.getContainerProperties().setPollTimeout(30000); 
     return factory; 

    } 

    /* 
    * @Bean public KafkaMessageListenerContainer<String, String> container() 
    * throws Exception { ContainerProperties properties = new 
    * ContainerProperties(this.topic); // set more properties return new 
    * KafkaMessageListenerContainer<>(consumerFactory(), properties); } 
    */ 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest 
                     // smallest 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

} 

और श्रोता के रूप में नीचे है,

@Service 
public class Listener { 

    private Logger log = Logger.getLogger(Listener.class); 


    @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") 
    public void process(String message/* , Acknowledgment ack */) { 
     Gson gson = new Gson(); 
     Record record = gson.fromJson(message, Record.class); 

     log.info(record.getId() + " " + record.getName()); 
     // ack.acknowledge(); 
    } 

} 

हालांकि मैं एक ही विषय पर संदेश उत्पादन कर रहा हूँ और इस उपभोक्ता एक ही विषय पर काम कर रहा है, श्रोता को क्रियान्वित नहीं है।

मैं काफ्का 0.10.0.1 चला रहा हूं, और यहां मेरा वर्तमान पोम है। यह उपभोक्ता कई कमांड लाइन नमूने के विपरीत वसंत बूट वेब एप्लिकेशन के रूप में काम कर रहा है।

<dependencies> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-web</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-kafka</artifactId> 
      <version>2.1.0.RELEASE</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-java-dsl</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>com.google.code.gson</groupId> 
      <artifactId>gson</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 

    </dependencies> 

मैं समय की एक अच्छी रकम खर्च कर दिया है क्यों यह श्रोता जब विषय संदेश हैं हिट नहीं हो रही है यह पता लगाने की, यह क्या मैं गलत कर रहा हूँ है।

मुझे पता है कि मैं एक चैनल का उपयोग कर संदेश प्राप्त कर सकता हूं (मैंने कोड में उस कॉन्फ़िगरेशन हिस्से को टिप्पणी में टिप्पणी की है), लेकिन यहां समरूपता संभालती है।

क्या इस प्रकार का कार्यान्वयन एसिंक संदेश खपत के साथ संभव है।

उत्तर

3

आपको @Configuration के साथ @EnableKafka जोड़ना होगा।

add जल्द ही कुछ विवरण देगा।

इस बीच:

@Configuration 
@EnableKafka 
public class KafkaConsumerConfig { 
+0

मैंने सोचा था कि EnableIntegration मेरी निर्माता में है कि की देखभाल कर रही है, मैं EnableKafka जरूरत नहीं है। मुझे इसे आज़माएं और –

+0

'@ सक्षम करें एकीकरण' वसंत एकीकरण के लिए है, लेकिन हम यहां वसंत काफ्का के बारे में बात करते हैं। वे पूरी तरह अलग स्वतंत्र परियोजनाएं हैं। देखो '@ कफका लिस्टनर' वसंत एकीकरण काफ्का के बाहर है। याद रखें कि '@ EnableJms',' @ EnableRabbit' और कई अन्य '@ सक्षम ...' हैं। केवल '@ सक्षम इंटीग्रेशन' उन सभी के बारे में परवाह नहीं कर सकता है। यह इसकी ज़िम्मेदारी नहीं है। –

+0

अंतर्दृष्टि के लिए बहुत बहुत धन्यवाद, आपका आकर्षण एक आकर्षण की तरह काम करता है। इसे ध्यान में रखेगा। –

संबंधित मुद्दे