2017-07-13 14 views
9

मैं स्प्रिंग-काफ्का संस्करण 1.2.1 का उपयोग कर रहा हूं और, जब काफ्का सर्वर डाउन/पहुंच योग्य नहीं है, तो एसिंक्रोनस कॉल एक समय के लिए ब्लॉक भेजता है। ऐसा लगता है कि टीसीपी टाइमआउट है। कोड कुछ इस तरह है:स्प्रिंग काफ्का एसिंक्रोनस कॉल ब्लॉक

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message); 
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { 
    @Override 
    public void onSuccess(SendResult<K, V> result) { 
     ... 
    } 

    @Override 
    public void onFailure(Throwable ex) { 
     ... 
    } 
}); 

मैं वसंत-काफ्का कोड पर एक बहुत शीघ्रता से अवलोकन कर लिया है और यह एक भविष्य के लिए एक कॉलबैक बातचीत का अनुवाद, बस काफ्का क्लाइंट लाइब्रेरी के लिए साथ काम पारित करने के लिए लगता है ऑब्जेक्ट इंटरैक्शन। कफका क्लाइंट लाइब्रेरी को देखते हुए, कोड अधिक जटिल हो जाता है और मैंने इसे समझने में समय नहीं लगाया, लेकिन मुझे लगता है कि यह एक ही थ्रेड में रिमोट कॉल (मेटाडाटा, कम से कम?) बना सकता है।

उपयोगकर्ता के रूप में, मुझे वसंत-काफ्का विधियों की उम्मीद थी जो भविष्य में वापस लौटने के लिए भविष्य लौट आए, भले ही रिमोट काफ्का सर्वर पहुंच योग्य न हो।

मेरी समझ गलत है या यदि यह एक बग है तो कोई पुष्टि होगी। मैं इसे अपने अंत में अब के अंत में असंकालिक बना रहा।

एक और समस्या यह है कि वसंत-काफ्का दस्तावेज, शुरुआत में, यह सिंक्रोनस और एसिंक्रोनस भेजने के तरीके प्रदान करता है। मुझे कोई भी तरीका नहीं मिला जो वायदा वापस नहीं करता है, शायद दस्तावेज़ीकरण को अद्यतन करने की आवश्यकता है।

यदि आवश्यक हो तो मुझे कोई और विवरण प्रदान करने में खुशी होगी। धन्यवाद।

उत्तर

1

बस सुनिश्चित करने के लिए। क्या आपके पास @EnableAsync एनोटेशन लागू है? मैं कहना चाहता हूं कि भविष्य <>

+0

आपकी प्रतिक्रिया के लिए धन्यवाद। नहीं, मैं इस एनोटेशन का उपयोग नहीं कर रहा हूं, दस्तावेज़ीकरण में इसके बारे में कुछ भी नहीं था। मैं कोशिश करूँगा और आपको बता दूंगा कि क्या यह समस्या हल करता है। –

+0

@EnableAsync का उपयोग करके दुर्भाग्य से कुछ भी नहीं बदला =/ –

4

कॉन्फ़िगरेशन क्लास पर @EnableAsync एनोटेशन के अतिरिक्त, @Async एनोटेशन को विधि पर उपयोग करने की आवश्यकता है, तो आप इस कोड को आमंत्रित करते हैं।

http://www.baeldung.com/spring-async

यहाँ कुछ कोड fragements। काफ्का निर्माता config:

@EnableAsync 
@Configuration 
public class KafkaProducerConfig { 

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); 

    @Value("${kafka.brokers}") 
    private String servers; 

    @Bean 
    public Map<String, Object> producerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     return props; 
    } 

    @Bean 
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) { 
     return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper)); 
    } 

    @Bean 
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) { 
     return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper)); 
    } 

    @Bean 
    public Producer producer() { 
     return new Producer(); 
    } 
} 

और निर्माता ही:

public class Producer { 

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); 

    @Autowired 
    private KafkaTemplate<String, GenericMessage> kafkaTemplate; 

    @Async 
    public void send(String topic, GenericMessage message) { 
     ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message); 
     future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() { 

      @Override 
      public void onSuccess(final SendResult<String, GenericMessage> message) { 
       LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); 
      } 

      @Override 
      public void onFailure(final Throwable throwable) { 
       LOGGER.error("unable to send message= " + message, throwable); 
      } 
     }); 
    } 
} 
+0

आपकी प्रतिक्रिया के लिए धन्यवाद। नहीं, मैं इन एनोटेशन का उपयोग नहीं कर रहा हूं, दस्तावेज में उनके बारे में कुछ भी नहीं था। मैं दोनों कोशिश करूँगा और आपको बता दूंगा कि क्या यह समस्या हल करता है। –

+0

दुर्भाग्य से EnableAsync का उपयोग करके कुछ भी नहीं बदला। साथ ही, लिंक से मैं समझता हूं कि यह वसंत-काफ्का लाइब्रेरी है जो असिंक एनोटेशन का उपयोग करनी चाहिए, क्योंकि यह मुझे भविष्य की वस्तु प्रदान करती है। –

+0

मैं आपसे सहमत हूं, मेरे लिए यह समझ में नहीं आता है कि आप वायदा प्रदान करते हैं लेकिन मुझे वैसे भी टिप्पणियां देना पड़ता है। हमारे मामले में उन दो एनोटेशनों को रखकर यह एक आकर्षण की तरह काम करता है। मैं कुछ कोड टुकड़े जोड़ने में प्रतिक्रिया संपादित कर दूंगा। –

संबंधित मुद्दे