2011-08-24 15 views
6

में सम्मिलित डेटा स्ट्रीमिंग में मेरे पास कई मोंगोडीबी संग्रह हैं जो विभिन्न स्ट्रीमिंग स्रोतों से कई JSON दस्तावेज़ लेते हैं। दूसरे शब्दों में वहां कई प्रक्रियाएं हैं जो लगातार मोंगोडीबी संग्रह के सेट में डेटा डाल रही हैं।मोंगो डीबी रीयल-टाइम (या रीयल-टाइम के पास)

मुझे डाउनस्ट्रीम अनुप्रयोगों में मोंगोडीबी के डेटा को स्ट्रीम करने का एक तरीका चाहिए।

App Stream1 --> 
App Stream2 -->  MONGODB  ---> Aggregated Stream 
App Stream3 --> 

या इस:: तो मैं एक प्रणाली है कि धारणात्मक इस तरह दिखता है चाहता हूँ

App Stream1 -->     ---> MongoD Stream1 
App Stream2 -->  MONGODB  ---> MongoD Stream2 
App Stream3 -->     ---> MongoD Stream3 

सवाल है कि कैसे मैं लगातार मतदान पर/डेटाबेस क्वेरी बिना मोंगो से बाहर डेटा स्ट्रीम करते हैं?

स्पष्ट प्रश्न का जवाब होगा "आप उन ऐप स्ट्रीमिंग प्रक्रियाओं को क्यों नहीं बदलते हैं जैसे कि रब्बीट, ज़ीरो या एक्टिवएमक्यू जैसे कतार में संदेश भेजने के लिए, जो उन्हें आपके मोंगो स्ट्रीमिंग प्रक्रियाओं और मोंगो को एक बार इस तरह भेजते हैं":

    MONGODB 
        /|\ 
        | 
App Stream1 -->  |   ---> MongoD Stream1 
App Stream2 --> SomeMQqueue ---> MongoD Stream2 
App Stream3 -->    ---> MongoD Stream3 

एक आदर्श दुनिया में हाँ है कि अच्छा होगा, लेकिन हम मोंगो जरूरत है सुनिश्चित करें कि संदेशों पहले सहेजे जाते हैं, डुप्लिकेट से बचने के लिए और सुनिश्चित करें कि आईडी सब उत्पन्न कर रहे हैं आदि मोंगो हठ के रूप में बीच में बैठने के लिए है परत।

तो मैं इन डाउन स्ट्रीम ऐप्स में एक मोंगो संग्रह (ग्रिडएफएस आदि का उपयोग नहीं) से संदेश कैसे स्ट्रीम करूं? विचारों का मूल विद्यालय केवल नए दस्तावेजों के लिए मतदान करना है और डेटाबेस में संग्रहीत JSON दस्तावेज़ों में एक और फ़ील्ड जोड़कर इसे एकत्रित किया गया प्रत्येक दस्तावेज़, एक एसक्यूएल तालिका में एक प्रक्रिया ध्वज की तरह है जो एक संसाधित समय टिकट मुद्रित करता है। अर्थात। दस्तावेज़ों के लिए प्रत्येक 1 सेकंड पोल जहां संसाधित == नल .... संसाधित = अब() .... दस्तावेज़ अपडेट करें।

क्या कोई neater/अधिक computationally कुशल विधि है?

एफवाईआई - ये सभी जावा प्रक्रियाएं हैं।

चीयर्स!

उत्तर

3

यदि आप किसी कैप्ड संग्रह (या संग्रह) पर लिख रहे हैं, तो आप स्ट्रीम पर नए डेटा को धक्का देने के लिए tailablecursor का उपयोग कर सकते हैं, या संदेश कतार पर जहां इसे स्ट्रीम किया जा सकता है। हालांकि यह एक गैर-कैप्ड संग्रह के लिए काम नहीं करेगा हालांकि।

+0

लिंक के लिए धन्यवाद। अफसोस की बात है कि एक मैसेजिंग सेवा के लिए कैप्ड संग्रह का उपयोग न करें, खराब सुविधा नहीं। प्रसंस्कृत ध्वज और मतदान पर एक इंडेक्स की तरह लगता है एकमात्र विकल्प है ... यदि कोई इंडेक्स आइटम शून्य है तो क्या यह अभी भी इंडेक्स में संदर्भित है या अभी भी संक्षिप्त संग्रह स्कैन के लिए पूछताछ करता है? – NightWolf

+1

या मैं खराब हूं कि हमारे पास कैश की तरह एक निश्चित आकार के कार्य पर एक कैप्ड संग्रह हो सकता है, फिर वस्तुओं को एक खरीद 1 खींचें और उन्हें एक सामान्य संग्रह में वापस रखें। सवाल तब बन जाता है कि हम ऐप रन के बीच अपना स्टेटस कर्सर कैसे सहेजते हैं? मुझे लगता है कि हम सिर्फ एक मोंगो ऑटो जेनरेट _id फ़ील्ड का उपयोग करते हैं और उस आईडी फ़ील्ड से अधिक कुछ भी चुनते हैं ... क्या सभी मोंगो ने बढ़ते क्रम में _ID उत्पन्न किए हैं? – NightWolf

+1

इंडेक्स 'शून्य' के लिए प्रविष्टियां स्टोर करते हैं। यदि आप एक कैप्ड संग्रह को तैयार कर रहे हैं, तो आपको पिछली प्रविष्टि को स्टोर करने की आवश्यकता है (आप इसे स्टोर कर सकते हैं, हालांकि आप चाहते हैं कि एक और मोंगो संग्रह ठीक काम करेगा), और उसके बाद उस तत्व पर '$ min का उपयोग करके अपना ट्यूटोरियल कर्सर शुरू करें) 'और' फिर से शुरू करने के लिए छोड़ें)। देखें http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max – dcrosta

संबंधित मुद्दे