2015-10-28 14 views
5

स्पार्क डीस्ट्रीम में mapPartition एपीआई है, जबकि फ्लिंक DataStream एपीआई नहीं है। क्या कोई ऐसा व्यक्ति है जो कारण की व्याख्या करने में मदद कर सकता है। मैं क्या करना चाहता हूं फ्लिंक पर स्पार्क reduceByKey के समान एपीआई को कार्यान्वित करना है।अपाचे फ्लिंक डेटास्ट्रीम एपीआई में नक्शा नहीं है पार्टिशन रूपांतरण

उत्तर

5

फ्लिंक स्ट्रीम स्ट्रीमिंग मॉडल स्पार्क स्ट्रीमिंग से काफी अलग है जो मिनी बैचों के आसपास केंद्रित है। स्पार्क स्ट्रीमिंग में प्रत्येक मिनी बैच को डेटा के सीमित सेट पर नियमित बैच प्रोग्राम की तरह निष्पादित किया जाता है, जबकि फ्लिंक डेटास्ट्रीम प्रोग्राम लगातार रिकॉर्ड को संसाधित करते हैं।

फ्लिंक के डेटासेट एपीआई में, MapPartitionFunction में दो पैरामीटर हैं। फ़ंक्शन के परिणामस्वरूप इनपुट और कलेक्टर के लिए एक पुनरावर्तक। एक फ्लिंक डेटास्ट्रीम प्रोग्राम में MapPartitionFunction कभी भी पहले फ़ंक्शन कॉल से वापस नहीं आएगा, क्योंकि इटरेटर रिकॉर्ड की एक अंतहीन धारा को फिर से चालू करेगा। हालांकि, फ्लिंक के आंतरिक स्ट्रीम प्रोसेसिंग मॉडल की आवश्यकता है कि उपयोगकर्ता फ़ंक्शन चेकपॉइंट फ़ंक्शन स्थिति के लिए वापस आएं। इसलिए, डेटास्ट्रीम एपीआई mapPartition परिवर्तन प्रदान नहीं करता है।

स्पार्क स्ट्रीमिंग के reduceByKey जैसी कार्यक्षमता को कार्यान्वित करने के लिए, आपको स्ट्रीम पर एक कीड विंडो को परिभाषित करने की आवश्यकता है। विंडोज धाराओं को विघटित करता है जो कुछ छोटे बैचों के समान होता है लेकिन खिड़कियां अधिक लचीलापन प्रदान करती हैं। चूंकि खिड़की सीमित आकार का है, इसलिए आप विंडो को reduce पर कॉल कर सकते हैं।

यह दिखाई दे सकता है जैसे:

yourStream.keyBy("myKey") // organize stream by key "myKey" 
      .timeWindow(Time.seconds(5)) // build 5 sec tumbling windows 
      .reduce(new YourReduceFunction); // apply a reduce function on each window 

DataStream documentation दिखाता है कि कैसे विभिन्न खिड़की प्रकार को परिभाषित करने के लिए और सभी उपलब्ध कार्यों बताते हैं।

नोट: डाटास्ट्रीम एपीआई को हाल ही में फिर से काम किया गया है। उदाहरण नवीनतम संस्करण (0.10-SNAPSHOT) मानता है जिसे अगले दिनों में 0.10.0 के रूप में रिलीज़ किया जाएगा।

+0

ऐसा लगता है कि आपके द्वारा प्रदान की गई 'कमबीकी' का समाधान 'कमीबीकी' के अलावा स्पार्क में 'ग्रुपबीकी' जैसा ही है। https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html –

+0

नहीं, फ्लिंक की 'कम करें() 'स्पार्क की' कमबीकी 'की तरह लागू होती है जो एक समूह पर एक जोड़ी-कम कार्य को कम करती है। हालांकि समूह परिभाषा थोड़ा अलग है, क्योंकि फ्लिंक मिनी बैचों में विंडो और स्पार्क कुंजी-मूल्य जोड़े का उपयोग करता है। फ्लिंक में स्पार्क के 'ग्रुपबीकी' के लिए कोई प्रत्यक्ष समतुल्य नहीं है, क्योंकि इसका तात्पर्य है कि पूर्ण समूह को स्मृति में भौतिक रूप से पूरा करने की आवश्यकता है, जिससे आउटऑफमेमरी एरर्स का कारण बन सकता है और जेवीएम को मार सकता है। फ्लिंक स्ट्रीम किए गए इटरेटर का उपभोग करने के लिए 'groupReduce()' प्रदान करता है। –

+0

मुझे लगता है कि फ्लिंक की कमी() संयोजन को लागू करती है। क्या ऐसा ही कारण है कि Flink DataStream में समूह विभाजन को MapPartition के रूप में कम नहीं किया गया है? –

0

अपने इनपुट धारा मान लिया जाये कि एक विभाजन डेटा है (स्ट्रिंग कहना)

val new_number_of_partitions = 4 

//below line partitions your data, you can broadcast data to all partitions 
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions) 

//flexibility for mapping 
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{ 
    // var local_val_to_different_part : Type = null 
    var myTaskId : Int = null 

    //below function is executed once for each mapper function (one mapper per partition) 
    override def open(config: Configuration): Unit = { 
    myTaskId = getRuntimeContext.getIndexOfThisSubtask 
    //do whatever initialization you want to do. read from data sources.. 
    } 

    def map(value: String): (String, Int) = { 
    (value, myTasKId) 
    } 
}) 

val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print 
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2)) 
//.countWindow will first wait for a certain number of records for perticular key 
// and then apply the function 

Flink स्ट्रीमिंग का शुद्ध स्ट्रीमिंग (नहीं batched एक) है। Iterate एपीआई पर एक नज़र डालें।

संबंधित मुद्दे