2016-06-05 4 views
6

मेरे स्पार्क स्ट्रीमिंग एप्लिकेशन में, मैं बैकएंड (लोचदार खोज) से पुनर्प्राप्त एक शब्दकोश के आधार पर एक मान मैप करना चाहता हूं। मैं समय-समय पर शब्दकोश को रीफ्रेश करना चाहता हूं, अगर इसे बैकएंड में अपडेट किया गया हो। यह लॉगस्टैश फ़िल्टर के आवधिक रीफ्रेश क्षमता का अनुवाद करने के समान होगा। मैं इसे स्पार्क के साथ कैसे प्राप्त कर सकता हूं (उदा। किसी भी तरह से हर 30 सेकंड में आरडीडी को अलग करता है)?स्पार्क स्ट्रीमिंग: समय-समय पर कैश किए गए आरडीडी को रीफ्रेश कैसे करें?

उत्तर

5

मुझे ऐसा करने का सबसे अच्छा तरीका आरडीडी को फिर से बनाना और इसके लिए एक परिवर्तनीय संदर्भ बनाए रखना है। स्पार्क स्ट्रीमिंग स्पार्क के शीर्ष पर एक मूल शेड्यूलिंग फ्रेमवर्क है। आरडीडी समय-समय पर ताज़ा होने के लिए हम शेड्यूलर पर पिग-बैक कर सकते हैं। उसके लिए, हमने एक खाली DStream कि हम केवल ताज़ा ऑपरेशन के लिए अनुसूची का उपयोग करें:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data 
val dstream = ??? // our data stream 

// a dstream of empty data 
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval)) 

var referenceData = getData() 
referenceData.cache() 
refreshDstream.foreachRDD{_ => 
    // evict the old RDD from memory and recreate it 
    referenceData.unpersist(true) 
    referenceData = getData() 
    referenceData.cache() 
} 

val myBusinessData = dstream.transform(rdd => rdd.join(referenceData)) 
... etc ... 

अतीत में, मैं भी केवल cache() और unpersist() interleaving कोई परिणाम के साथ (यह केवल एक बार ताज़ा करता है) के साथ की कोशिश की है। आरडीडी को पुनर्निर्मित करने से सभी वंशावली हटा दी जाती है और नए डेटा का एक साफ भार प्रदान करता है।

+0

क्या ConstantInputDStream के लिए जावा विकल्प है? – user2100493

+0

क्या यह गारंटी है कि संदर्भ डेटा रीफ्रेश जॉब्स (GetData() द्वारा ट्रिगर) हमेशा व्यापार से पहले होता हैडेटाडेट्स नौकरियां निर्धारित की जाती हैं? क्या हमारे पास एक परिदृश्य होगा जहां rdd.join (contextData) नौकरियां निर्धारित की जाती हैं जब संदर्भ डेटा रीफ्रेश हो रहा है ?? – Cheeko

+0

@maasg 'getData() 'कॉल शेड्यूल कैसे करें? प्रश्न से, 'हर 30 सेकंड'? –

संबंधित मुद्दे