2015-03-11 22 views
5

क्यों StatefulNetworkWordCount.scala उदाहरण कॉल कुख्यात updateStateByKey() फ़ंक्शन, जो केवल बजाय साथ पैरामीटर के रूप में एक समारोह लेने के लिए माना जाता है सोच के साथ updateStateByKey कॉल:स्पार्क उदाहरण स्ट्रीमिंग अतिरिक्त पैरामीटर

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, 
    new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) 

क्यों की जरूरत (और यह कैसे संसाधित हो जाता है - यह एक विभाजनकर्ता, एक बूलियन, और एक आरडीडी पास करने के लिए UpdateStateByKey()?) के हस्ताक्षर में नहीं है?

धन्यवाद, मैट

उत्तर

4

यह इसलिए क्योंकि:

  1. आप अलग अलग स्पार्क रिलीज शाखा देखें: https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala। स्पार्क 1.2 में यह कोड केवल updateStateByKey पैरामीटर के रूप में एक ही फ़ंक्शन प्राप्त करने के साथ था, जबकि 1.3 में उन्होंने इसे
  2. updateStateByKey के विभिन्न संस्करण 1.2 और 1.3 दोनों में मौजूद हैं। लेकिन 1.2 में 4 मानकों के साथ कोई संस्करण है वहाँ, यह 1.3 में केवल पेश किया गया:

    /** 
    * Return a new "state" DStream where the state for each key is updated by applying 
    * the given function on the previous state of the key and the new values of each key. 
    * org.apache.spark.Partitioner is used to control the partitioning of each RDD. 
    * @param updateFunc State update function. Note, that this function may generate a different 
    * tuple with a different key than the input key. Therefore keys may be removed 
    * or added in this way. It is up to the developer to decide whether to 
    * remember the partitioner despite the key being changed. 
    * @param partitioner Partitioner for controlling the partitioning of each RDD in the new 
    * DStream 
    * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. 
    * @param initialRDD initial state value of each key. 
    * @tparam S State type 
    */ 
    def updateStateByKey[S: ClassTag](
        updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], 
        partitioner: Partitioner, 
        rememberPartitioner: Boolean, 
        initialRDD: RDD[(K, S)] 
    ): DStream[(K, S)] = { 
        new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, 
        rememberPartitioner, Some(initialRDD)) 
    } 
    
    : https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

यहाँ कोड है

संबंधित मुद्दे