2016-11-24 11 views
7

मेरे पास 2 स्रोत हैं जो विभिन्न स्रोतों से सटीक सामग्री को स्ट्रीम कर रहे हैं, इसलिए स्रोतों में से कोई एक विफल होने पर मुझे उच्च उपलब्धता हो सकती है। मैं कफका स्ट्रीम 0.10.1.0 का उपयोग करके 2 विषयों को 1 आउटपुट विषय में विलय करने का प्रयास कर रहा हूं, जैसे कि मुझे असफलताओं पर कोई संदेश याद नहीं है और सभी स्रोत ऊपर होने पर कोई डुप्लिकेट नहीं है।एकाधिक समान कफका स्ट्रीम विषयों को विलय करना

केस्ट्रीम की leftJoin विधि का उपयोग करते समय, कोई भी विषय कोई समस्या (द्वितीयक विषय) के साथ नीचे जा सकता है, लेकिन जब प्राथमिक विषय नीचे चला जाता है, आउटपुट विषय पर कुछ भी नहीं भेजा जाता है। यह, क्योंकि होने की Kafka Streams developer guide के अनुसार लगता है

KStream-KStream leftJoin हमेशा प्राथमिक स्ट्रीम से आने वाले रिकॉर्ड से प्रेरित है

इसलिए अगर कोई रिकॉर्ड प्राथमिक स्ट्रीम से आ रही हैं यह, माध्यमिक धारा से रिकॉर्ड का उपयोग नहीं करेंगे भले ही वे मौजूद हों। एक बार जब प्राथमिक स्ट्रीम ऑनलाइन वापस आती है, तो आउटपुट सामान्य रूप से फिर से शुरू होता है।

मैं भी outerJoin (जो डुप्लिकेट रिकॉर्ड कहते हैं) एक KTable और groupByKey में रूपांतरण के बाद डुप्लिकेट से छुटकारा पाने के,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

उपयोग करने की कोशिश की है, लेकिन मैं अभी भी एक समय में एक बार डुप्लिकेट मिलता है। मैं अक्सर आउटपुट स्ट्रीम को भेजने के लिए KTable प्राप्त करने के लिए commit.interval.ms=200 का उपयोग कर रहा हूं।

एकाधिक समान इनपुट विषयों से सटीक-बार आउटपुट प्राप्त करने के लिए इस विलय तक पहुंचने का सबसे अच्छा तरीका क्या होगा?

+0

सामान्य रूप से, मैं समस्या को हल करने के लिए प्रोसेसर एपीआई की सिफारिश करता हूं। आप वर्तमान 'ट्रंक' संस्करण पर स्विच करने का भी प्रयास कर सकते हैं (सुनिश्चित नहीं है कि यह आपके लिए संभव है)। जॉइन को फिर से काम किया गया है, और इससे आपकी समस्या हल हो सकती है: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics नई जॉइन सेमेन्टिक्स काफका '0.10.2' में शामिल किया जाएगा जो लक्षित रिलीज दिनांक जनवरी 2017 (https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan) है। –

+0

@ MatthiasJ.Sax मैंने ट्रंक पर स्विच किया और ऐसा लगता है कि 'leftJoin' अब KStream-KStream में शामिल होने के लिए' बाहरी जोइन 'जैसा व्यवहार करता है, इसलिए मुझे लगता है कि मैं 10.1 सेमेन्टिक्स पर वापस जाऊंगा। जो मैं अब कोशिश कर रहा हूं वह एक नकली धारा बनाना है जो नल का उत्पादन करता है जिसे मैं बाएं जोइन में प्राथमिक के रूप में उपयोग करता हूं, प्राथमिक के रूप में उपयोग किया जाता है, और द्वितीयक के साथ बाएं जोइन में विलय का उपयोग करता है। मुझे आशा है कि इसके परिणामस्वरूप प्राथमिक धारा में हमेशा मूल्य होंगे, भले ही मेरा प्राथमिक डाउन हो (जैसा कि मैं पहले बाएं जॉइन से शून्य प्राप्त करूंगा)। –

+0

नया 'बाएं जोइन' पुराना 'बाहरी जोइन' के रूप में दोनों तरफ से ट्रिगर करता है (मुझे लगता है कि इसका मतलब है "बाएं जोइन अब बाहरी जोइन की तरह व्यवहार करता है"?) - यह पुराने 'बाएं जोइन' की तुलना में एसक्यूएल सेमेन्टिक्स के करीब है - लेकिन 'बाएं जोइन' अभी भी 'बाहरी जॉइन' से अलग है: यदि दाएं हाथ की तरफ ट्रिगर होता है और इसमें कोई भागीदार नहीं मिलता है, तो यह रिकॉर्ड छोड़ देता है और कोई परिणाम उत्सर्जित नहीं होता है । –

उत्तर

5

किसी भी तरह के जॉइन का उपयोग करने से आपकी समस्या का समाधान नहीं होगा, क्योंकि आप हमेशा गायब नतीजे (कुछ स्ट्रीम स्टॉल के मामले में आंतरिक-शामिल) या null के साथ "डुप्लिकेट" के साथ समाप्त हो जाएंगे (बाएं-जुड़ें या बाहरी-जुड़ें मामले दोनों धाराएं ऑनलाइन हैं)। काफ्का स्ट्रीम में सेमेन्टिक्स में शामिल होने के विवरण के लिए https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics देखें।

इस प्रकार, मैं प्रोसेसर एपीआई कि आप KStreamprocess(), transform(), या transformValues() का उपयोग कर डीएसएल के साथ मिश्रण-एवं-मिलान कर सकते हैं उपयोग करने के लिए सिफारिश करेंगे। अधिक जानकारी के लिए How to filter keys and value with a Processor using Kafka Stream DSL देखें।

डुप्लिकेट-फ़िल्टरिंग गलती-सहिष्णु बनाने के लिए आप अपने प्रोसेसर (How to add a custom StateStore to the Kafka Streams DSL processor?) में एक कस्टम स्टोर भी जोड़ सकते हैं।

संबंधित मुद्दे