2016-08-10 10 views
9

मैं डेटा स्ट्रीमिंग एप्लिकेशन पर काम कर रहा हूं और मैं इस परियोजना के लिए अपाचे फ्लिंक का उपयोग करने की संभावना की जांच कर रहा हूं। इसका मुख्य कारण यह है कि यह अच्छी उच्च स्तरीय स्ट्रीमिंग संरचनाओं का समर्थन करता है, जो जावा 8 के स्ट्रीम एपीआई के समान ही है।अपाचे फ्लिंक में डेटाबेस से रिकॉर्ड की स्थिति को कैसे देखें और अपडेट करें?

मुझे ऐसी घटनाएं मिलेंगी जो डेटाबेस में एक विशिष्ट रिकॉर्ड से मेल खाती हैं, और मैं इन घटनाओं को संसाधित करने में सक्षम होना चाहता हूं (संदेश ब्रोकर जैसे कि खरगोश एमक्यू या काफ्का से आना) और अंततः डेटाबेस में रिकॉर्ड अपडेट करना और संसाधित/रूपांतरित घटनाओं को एक और सिंक (शायद एक और संदेश दलाल) पर धक्का दें।

एक विशिष्ट रिकॉर्ड से संबंधित घटनाओं को आदर्श रूप से एफआईएफओ ऑर्डरिंग में संसाधित करने की आवश्यकता होगी (हालांकि एक टाइमस्टैम्प होगा जो आदेश घटनाओं से भी पता लगाने में मदद करता है), लेकिन विभिन्न अभिलेखों से संबंधित घटनाओं को समानांतर में संसाधित किया जा सकता है। मैं रिकॉर्ड द्वारा धारा विभाजन के लिए keyBy() निर्माण का उपयोग करने की योजना बना रहा था।

प्रक्रिया की आवश्यकता है जो रिकॉर्डिंग के बारे में डेटाबेस में वर्तमान जानकारी पर निर्भर करता है। हालांकि, मैं इस तरह के रिकॉर्ड के लिए डेटाबेस से पूछताछ करने के लिए एक उदाहरण या अनुशंसित दृष्टिकोण खोजने में असमर्थ हूं, इस घटना को समृद्ध करने के लिए कि इसे संसाधित करने के लिए आवश्यक अतिरिक्त जानकारी के साथ संसाधित किया जा रहा है। > KeyBy() आईडी पर प्राप्त - -> रिकॉर्ड पर प्रसंस्करण चरणों को पूरा - -> आईडी के लिए इसी डेटाबेस से रिकॉर्ड को पुनः प्राप्त> धक्का

:

पाइपलाइन मैं मन में है इस प्रकार है बाहरी कतार में संसाधित घटना और डेटाबेस रिकॉर्ड

डेटाबेस रिकॉर्ड को अद्यतन करने की आवश्यकता होगी क्योंकि कोई अन्य एप्लिकेशन डेटा से पूछताछ करेगा।

इस पाइपलाइन के बाद कोई अतिरिक्त अनुकूलन हो सकता है। उदाहरण के लिए कोई एक प्रबंधित स्थिति में (अपडेटेड) रिकॉर्ड कैश कर सकता है ताकि उसी रिकॉर्ड पर अगली घटना को किसी अन्य डेटाबेस क्वेरी की आवश्यकता न हो। हालांकि, अगर एप्लिकेशन को किसी विशिष्ट रिकॉर्ड के बारे में पता नहीं है तो उसे डेटाबेस से पुनर्प्राप्त करने की आवश्यकता होगी।

अपाचे फ्लिंक में इस तरह के परिदृश्य के लिए उपयोग करने का सबसे अच्छा तरीका क्या है?

उत्तर

4

उदाहरण के लिए समृद्ध फ़ंक्शन को बढ़ाकर आप डेटाबेस को देख सकते हैं। एक RichFlatMap समारोह, अपने open() विधि में एक बार डेटाबेस कनेक्शन को प्रारंभ और उसके बाद flatMap() विधि में प्रत्येक घटना पर कार्रवाई:

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> { 

    // Declare DB coonection and query statements 

    @Override 
    public void open(Configuration parameters) throws Exception { 
     // Initialize Database connection 
     // Prepare Query statements 
    } 

    @Override 
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception { 
     // look up the Database, update record, enrich event 
     out.collect(enrichedEvent);   
    } 
}) 

और इस प्रकार है तो आप DatabaseMapper उपयोग कर सकते हैं:

stream.keyby(id) 
     .flatmap(new DatabaseMapper()) 
     .addSink(..); 

आप here पा सकते हैं Redis से कैश डेटा का उपयोग कर एक उदाहरण।

+1

धन्यवाद। तो क्या होता है जब फ्लिंक एक वितरित क्लस्टर मोड में चलता है। क्या यह क्लस्टर के प्रत्येक नोड से कनेक्शन स्थापित करता है? – jbx

+0

आपके द्वारा सेट की गई समानता के आधार पर आपके पास कई 'फ्लैटमैप' उदाहरण हो सकते हैं। प्रत्येक नोड पर एक से अधिक ऑपरेटर उदाहरण हो सकते हैं (कॉन्फ़िगर किए गए कार्य स्लॉट की संख्या के आधार पर)। प्रत्येक समांतर उदाहरण के लिए, 'खुली() 'विधि को बिल्कुल एक बार कहा जाता है और प्रत्येक आने वाली घटना के लिए' flatmap() 'कहा जाता है। –

+1

ठीक है धन्यवाद। मुझे जाना होगा किसी भी मौके से क्या आप जानते हैं कि फ्लिंक (या इसके विपरीत) के भीतर वसंत को एकीकृत करना संभव है? निर्भरता इंजेक्शन, इकाई प्रबंधक ऑटो-वायरिंग इत्यादि जैसी चीजें हैं।यह उपयोग करने में बहुत उपयोगी होगा, लेकिन मुझे उन लोगों के बारे में ऑनलाइन जानकारी नहीं मिल रही है जिन्होंने इसे किया (और यदि कोई समस्या है)। – jbx

संबंधित मुद्दे