मेरे स्पार्क स्ट्रीमिंग एप्लिकेशन में, मैं बैकएंड (लोचदार खोज) से पुनर्प्राप्त एक शब्दकोश के आधार पर एक मान मैप करना चाहता हूं। मैं समय-समय पर शब्दकोश को रीफ्रेश करना चाहता हूं, अगर इसे बैकएंड में अपडेट किया गया हो। यह लॉगस्टैश फ़िल्टर के आवधिक रीफ्रेश क्षमता का अनुवाद करने के समान होगा। मैं इसे स्पार्क के साथ कैसे प्राप्त कर सकता हूं (उदा। किसी भी तरह से हर 30 सेकंड में आरडीडी को अलग करता है)?स्पार्क स्ट्रीमिंग: समय-समय पर कैश किए गए आरडीडी को रीफ्रेश कैसे करें?
6
A
उत्तर
5
मुझे ऐसा करने का सबसे अच्छा तरीका आरडीडी को फिर से बनाना और इसके लिए एक परिवर्तनीय संदर्भ बनाए रखना है। स्पार्क स्ट्रीमिंग स्पार्क के शीर्ष पर एक मूल शेड्यूलिंग फ्रेमवर्क है। आरडीडी समय-समय पर ताज़ा होने के लिए हम शेड्यूलर पर पिग-बैक कर सकते हैं। उसके लिए, हमने एक खाली DStream कि हम केवल ताज़ा ऑपरेशन के लिए अनुसूची का उपयोग करें:
def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream
// a dstream of empty data
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))
var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ =>
// evict the old RDD from memory and recreate it
referenceData.unpersist(true)
referenceData = getData()
referenceData.cache()
}
val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...
अतीत में, मैं भी केवल cache()
और unpersist()
interleaving कोई परिणाम के साथ (यह केवल एक बार ताज़ा करता है) के साथ की कोशिश की है। आरडीडी को पुनर्निर्मित करने से सभी वंशावली हटा दी जाती है और नए डेटा का एक साफ भार प्रदान करता है।
संबंधित मुद्दे
- 1. स्पार्क स्ट्रीमिंग में आरडीडी विभाजन
- 2. स्पार्क स्ट्रीमिंग डीस्ट्रीम आरडीडी फ़ाइल नाम
- 3. स्पार्क स्ट्रीमिंग: foreachRDD अपडेट करें मेरे मोंगो आरडीडी
- 4. कैश किए गए संस्करण
- 5. स्पार्क: आरडीडी
- 6. स्पार्क आरडीडी
- 7. स्पार्क आरडीडी
- 8. स्पार्क स्ट्रीमिंग डेटा को स्पार्क डेटाफ्रेम
- 9. स्पार्क आरडीडी
- 10. स्पार्क आरडीडी
- 11. स्पार्क त्रुटि आरडीडी प्रकार आरडीडी
- 12. स्पार्क-स्ट्रीमिंग संदर्भ में एचडीएफएस में एक आरडीडी लिखें
- 13. अपाचे स्पार्क आरडीडी
- 14. स्पार्क/स्कैला: आरडीडी को फंक्शन
- 15. स्पार्क आरडीडी यूनियन
- 16. स्पार्क स्ट्रीमिंग
- 17. स्पार्क आरडीडी फोरैच
- 18. स्पार्क-स्ट्रीमिंग
- 19. कैश किए गए फ़ीड
- 20. अपाचे स्पार्क आरडीडी स्प्लिट "|"
- 21. कैश किए गए सीएसएस मुद्दों को पीड़ित किए बिना वेब ऐप कैसे अपडेट करें?
- 22. स्पार्क उदाहरण स्ट्रीमिंग अतिरिक्त पैरामीटर
- 23. स्पार्क स्ट्रीमिंग
- 24. आरडीडी को कई आरडीडी में डुप्लिकेट कैसे करें?
- 25. स्पार्क आरडीडी- मानचित्र बनाम नक्शापार्टिशन
- 26. डेटाफ्रेम से आरडीडी तक [लेबल किए गए पॉइंट]
- 27. स्पार्क स्ट्रीमिंग UpdateStateByKey
- 28. अद्यतन कैश किए गए टेम्पलेट्स
- 29. अपाचे स्पार्क स्ट्रीमिंग
- 30. काफ्का स्ट्रीमिंग + स्पार्क स्ट्रीमिंग + मशीन लर्निंग
क्या ConstantInputDStream के लिए जावा विकल्प है? – user2100493
क्या यह गारंटी है कि संदर्भ डेटा रीफ्रेश जॉब्स (GetData() द्वारा ट्रिगर) हमेशा व्यापार से पहले होता हैडेटाडेट्स नौकरियां निर्धारित की जाती हैं? क्या हमारे पास एक परिदृश्य होगा जहां rdd.join (contextData) नौकरियां निर्धारित की जाती हैं जब संदर्भ डेटा रीफ्रेश हो रहा है ?? – Cheeko
@maasg 'getData() 'कॉल शेड्यूल कैसे करें? प्रश्न से, 'हर 30 सेकंड'? –