2015-09-10 12 views
6

मेरे पास RDD[String], wordRDD है। मेरे पास एक ऐसा फ़ंक्शन भी है जो एक स्ट्रिंग/शब्द से आरडीडी [स्ट्रिंग] बनाता है। मैं wordRDD में प्रत्येक स्ट्रिंग के लिए एक नया आरडीडी बनाना चाहता हूं। यहाँ मेरी प्रयास कर रहे हैं:आरडीडी से आरडीडी का संग्रह कैसे बनाएं?

1) विफल क्योंकि स्पार्क नेस्टेड RDDs का समर्थन नहीं करता:

var newRDD = wordRDD.map(word => { 
    // execute myFunction() 
    (new MyClass(word)).myFunction() 
}) 

2) (विफल गुंजाइश मुद्दे को संभवतः कारण):

var newRDD = sc.parallelize(new Array[String](0)) 
val wordArray = wordRDD.collect 
for (w <- wordArray){ 
    newRDD = sc.union(newRDD,(new MyClass(w)).myFunction()) 
} 

मेरे आदर्श परिणाम होगा

// input RDD (wordRDD) 
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...) 

// myFunction behavior 
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl') 

// after executing myFunction() on each word in wordRDD: 
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...) 

मैं एक प्रासंगिक सवाल यहां पाया: की तरह लग रहे Spark when union a lot of RDD throws stack overflow error, लेकिन यह पता नहीं था कि मेरी मुद्दा।

उत्तर

3

उपयोग flatMapRDD[String] प्राप्त करने के लिए के रूप में आप की इच्छा।

var allWords = wordRDD.flatMap { word => 
    (new MyClass(word)).myFunction().collect() 
} 
+1

यह समानांतर में कैसे चल रहा है? 'WordRDD.map' के भीतर जो कुछ भी होता है उसे क्लस्टर पर निष्पादित किया जाता है। इस प्रकार, आंतरिक 'संग्रह' को चल रहे नौकरी के भीतर से एक नई स्पार्क नौकरी ट्रिगर करना पड़ता है। मुझे संदेह है कि यह वितरित नहीं होगा। –

+0

वह आरडीडी के बजाए सरणी लौटने के लिए फ़ंक्शन को भी बदल सकता है, लेकिन सवाल ने वास्तविक फ़ंक्शन निर्दिष्ट नहीं किया था। –

+0

लेकिन उसका वर्णन कहता है कि उसके पास एक कार्य है, मुझे लगता है कि यह 'मेरा कार्य' है जो एक स्ट्रिंग/शब्द से 'आरडीडी [स्ट्रिंग]' बनाता है। –

3

आप RDD के भीतर से RDD नहीं बना सकते हैं।

हालांकि, यह आपके समारोह myFunction: String => RDD[String], जो किसी अन्य समारोह modifiedFunction: String => Seq[String] जिनके अनुसार इसे एक RDD भीतर से इस्तेमाल किया जा सकता में इनपुट जहां एक पत्र निकाल दिया जाता है, से सभी शब्द उत्पन्न करता पुनर्लेखन के लिए संभव है। इस तरह, यह आपके क्लस्टर पर समानांतर में भी निष्पादित किया जाएगा। modifiedFunction होने के बाद आप wordRDD.flatMap(modifiedFunction) पर कॉल करके सभी शब्दों के साथ अंतिम RDD प्राप्त कर सकते हैं।

महत्वपूर्ण बिंदु flatMap (map करने और flatten परिवर्तनों) का उपयोग करने के लिए है:

def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 

    val input = sc.parallelize(Seq("apple", "ananas", "banana")) 

    // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...) 
    val result = input.flatMap(modifiedFunction) 
} 

def modifiedFunction(word: String): Seq[String] = { 
    word.indices map { 
    index => word.substring(0, index) + word.substring(index+1) 
    } 
} 
संबंधित मुद्दे