2016-02-18 7 views
6

मैं एक पुनरावृत्ति गणना के परिणाम एकत्र करने के लिए एक आरडीडी बनाना चाहता हूं।एक पुनरावृत्ति गणना के परिणाम एकत्र करने के लिए आरडीडी बनाना

मैं एक पाश (या किसी भी वैकल्पिक) का उपयोग कर सकते कैसे निम्नलिखित कोड को बदलने के लिए:

import org.apache.spark.mllib.random.RandomRDDs._  

val n = 10 

val step1 = normalRDD(sc, n, seed = 1) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong) 
val result2 = result1.zip(step3) 

... 

val step50 = normalRDD(sc, n, seed = (step49.max).toLong) 
val result49 = result48.zip(step50) 

(एन कदम RDDs बनाने और अंत में एक साथ तो ज़िप करने भी लंबे 50 RDDs के रूप में ठीक हो जाएगा बीज = सम्मान करने के लिए बनाई गई हैं iteratively (चरण (n-1) .max) हालत)

+0

मैं scalaz से Stream.unfold' प्रयोग करेंगे 'एक उत्पन्न करने के लिए चरणों की धारा, और फिर इसे ज़िप अपने और/या स्कैनराइट के साथ .. – Reactormonk

उत्तर

6

एक पुनरावर्ती समारोह काम करेगा:

/** 
* The return type is an Option to handle the case of a user specifying 
* a non positive number of steps. 
*/ 
def createZippedNormal(sc : SparkContext, 
         numPartitions : Int, 
         numSteps : Int) : Option[RDD[Double]] = { 

    @scala.annotation.tailrec 
    def accum(sc : SparkContext, 
      numPartitions : Int, 
      numSteps : Int, 
      currRDD : RDD[Double], 
      seed : Long) : RDD[Double] = { 
    if(numSteps <= 0) currRDD 
    else { 
     val newRDD = normalRDD(sc, numPartitions, seed) 
     accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) 
    } 
    } 

    if(numSteps <= 0) None 
    else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) 
} 
+0

पूंछ रिकर्स आपको आरडीडी वंश से ढेर उड़ाने से बचाएगा :) – zero323

+0

@ zero323 सहमत। हालांकि, यह मुद्दा प्रश्न की आवश्यकताओं के अनुरूप है। किसी भी उत्तर में एक ही समस्या का सामना करना पड़ेगा। –

+0

बस यह इंगित करना चाहता था कि आप दृश्यों के पीछे एक पुनरावर्ती डेटा संरचना बना रहे हैं जो पूंछ अनुकूलित नहीं होगा। और कुछ नहीं :) और वास्तव में आप इसे हल कर सकते हैं और चेकपॉइंट्स का उपयोग करके समस्या से बच सकते हैं। यह एक ज़िप के बिना भी सुलभ है :) – zero323

संबंधित मुद्दे