2016-01-15 9 views
15

मैं नया mongodb RDD प्रत्येक बार जब मैं foreachRDD के अंदर प्रवेश करता हूं तो अपडेट करना चाहता हूं।स्पार्क स्ट्रीमिंग: foreachRDD अपडेट करें मेरे मोंगो आरडीडी

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 

किसी भी विचार:

mydstream 
    .foreachRDD(rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     // ssc is my StreamingContext 
     val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }) 

यह मैं एक त्रुटि दे देंगे: हालांकि मैं क्रमबद्धता मुद्दे हैं?

+0

'स्पार्ककॉन्टेक्स्ट' धारावाहिक नहीं है, इसलिए आप किसी भी परिवर्तन या क्रिया विधियों के अंदर उपयोग नहीं कर सकते हैं, आपको केवल ड्राइवर वर्ग में उपयोग करना होगा। – Shankar

+0

क्या कोई विशिष्ट कारण है कि आप foreachRDD विधि के अंदर सूची को आरडीडी में परिवर्तित क्यों कर रहे हैं? – Shankar

उत्तर

7

आप rdd.context है कि या तो एक SparkContext या एक SparkStreamingContext (यदि RDD एक DStream है) देता है उपयोग करने के लिए कोशिश कर सकते हैं।

mydstream foreachRDD { rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) }) 

वास्तव में, ऐसा लगता है कि RDD भी एक .sparkContext विधि है। मैं ईमानदारी से अंतर नहीं जानता, शायद वे उपनाम हैं?()।

2

मेरी समझ में आपको जोड़ना होगा कि क्या आपके पास "serializable" ऑब्जेक्ट नहीं है, तो आपको foreachPartition के माध्यम से इसे पास करने की आवश्यकता है ताकि आप अपनी प्रसंस्करण चलाने से पहले प्रत्येक नोड पर डेटाबेस से कनेक्शन बना सकें।

mydstream.foreachRDD(rdd => { 
     rdd.foreachPartition{ 
      val mongoClient = MongoClient("localhost", 27017) 
      val db = mongoClient(mongoDatabase) 
      val coll = db(mongoCollection) 
      // ssc is my StreamingContext 
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }}) 
+0

यह काम नहीं करेगा, क्योंकि एसएससी धारावाहिक नहीं है। –

+0

आप rdd.foreachPartition 'val ssc = StreamingContext.getOrCreate (checkpointdirectory, functionToCreateContext _) से पहले foreachRDD के अंदर अपना एसएससी बनाने का प्रयास कर सकते हैं। – Rami

संबंधित मुद्दे