2015-07-21 6 views
5

स्पार्क स्ट्रीमिंग का उपयोग करते समय, क्या डीएस स्ट्रीम में प्रत्येक आरडीडी के पहले n तत्व प्राप्त करना संभव है? असली दुनिया में, मेरी धारा में कई भूगर्भित घटनाएं होती हैं, और मैं 100 (या जो भी) लेना चाहता हूं जो आगे की प्रक्रिया के लिए किसी दिए गए बिंदु के सबसे नज़दीक हैं, लेकिन एक साधारण उदाहरण जो दिखाता है कि मैं क्या करने की कोशिश कर रहा हूं है कुछ की तरह:क्या स्पार्क स्ट्रीमिंग में प्रत्येक आरडीडी के पहले एन तत्व प्राप्त करना संभव है?

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.dstream.ConstantInputDStream 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

object take { 
    def main(args: Array[String]) { 

    val data = 1 to 10 

    val sparkConf = new SparkConf().setAppName("Take"); 
    val streamingContext = new StreamingContext(sparkConf, Seconds(1)) 

    val rdd = streamingContext.sparkContext.makeRDD(data) 
    val stream = new ConstantInputDStream(streamingContext, rdd) 

    // In the real world, do a bunch of stuff which results in an ordered RDD 

    // This obviously doesn't work 
    // val filtered = stream.transform { _.take(5) } 

    // In the real world, do some more processing on the DStream 

    stream.print() 

    streamingContext.start() 
    streamingContext.awaitTermination() 
    } 
} 

मैं समझता हूँ कि मैं शीर्ष n परिणामों को वापस चालक को काफी आसानी से खींच सकता है, लेकिन यह कुछ मैं इस मामले में क्या करने के लिए के रूप में मैं RDD पर आगे की प्रक्रिया करने की ज़रूरत नहीं है चाहता हूँ इसे फ़िल्टर करने के बाद।

उत्तर

7

यह क्यों काम नहीं कर रहा है? मुझे लगता है कि आपका उदाहरण ठीक है।

  1. आप प्रत्येक घटना
  2. क्रमबद्ध डेटा के अपने राशि के लिए अनुकूलित विभाजन की संख्या के साथ दूरी के अनुसार घटनाओं के लिए दूरी की गणना करना चाहिए
  3. प्रत्येक विभाजन से पहले 100 घटनाओं ले लो (ताकि आप शफ़ल करेंगे प्रारंभिक डेटा का एक छोटा सा हिस्सा), केवल एक विभाजन तो सभी डेटा एक ही डाटासेट
  4. में फेरबदल किया गया है के साथ फिर से क्रमबद्ध लौटे संग्रह sparkContext.parallelize के साथ एक नया RDD (डेटा)
  5. बनाने के पहले 100 घटनाओं ले लो , यह आपका शीर्ष 100
  6. है

क्रमबद्ध करने के लिए कोड चरण 2 और 4 में समान है, तो आप केवल विभाजनों की संख्या को बदलते हैं।

चरण 1 को डीस्ट्रीम पर निष्पादित किया जाता है, आरडीडी पर चरण 2 से 5 को एक ट्रांसफॉर्म ऑपरेशन में निष्पादित किया जाता है।

+0

यह चरण 3 है कि मैं संघर्ष कर रहा हूं - प्रत्येक विभाजन से पहले 100 घटनाओं को कैसे लेना है। 'ले लो() 'काम नहीं करता क्योंकि यह आरडीडी नहीं लौटाता है। –

+0

आप इसे स्पार्ककॉन्टेक्स्ट.परेललाइज (डेटा) के साथ आरडीडी के रूप में पैराराइलाइज़ कर सकते हैं। –

+0

आह! यही वह था जो मैं याद कर रहा था। क्या आप इसे अपने उत्तर में संपादित कर सकते हैं और फिर मैं सम्मान करूँगा? –

संबंधित मुद्दे