2016-05-20 12 views
5

स्ट्रीमिंग मैं अपाचे स्पार्क 1.6.1 स्ट्रीमिंग एक जावा अनुप्रयोग है कि दो की/मान डेटा धाराओं जुड़ जाता है और HDFS के उत्पादन लिखते हैं लिखने के लिए उपयोग कर रहा हूँ ले जाने के लिए। दो डेटा स्ट्रीम में के/वी तार होते हैं और टेक्स्टफाइलस्ट्रीम() का उपयोग कर स्पार्क में एचडीएफएस से समय-समय पर निगमित होते हैं।कैसे स्पार्क में कई बैच के अंतराल पर डेटा धाराओं

दो डेटा धाराओं सिंक्रनाइज़ नहीं कर रहे हैं, जिसका मतलब है कि कुछ सुझाव दिए गए उस समय t0 पर stream1 में हैं समय t1, या इसके विपरीत पर stream2 में दिखाई दे सकते हैं। इसलिए, मेरा लक्ष्य दो धाराओं में शामिल होने और गणना "बचे हुए" कुंजी है, जो अगले बैच के अंतराल में शामिल होने के ऑपरेशन के लिए विचार किया जाना चाहिए है।

बेहतर यह स्पष्ट करने के लिए, निम्नलिखित कलन विधि को देखो:

variables: 
stream1 = <String, String> input stream at time t1 
stream2 = <String, String> input stream at time t1 
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0 
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0 

operations at time t1: 
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2) 
write out_stream to HDFS 
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2) 
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2) 

मैं स्पार्क असफल स्ट्रीमिंग के साथ इस एल्गोरिथ्म को लागू करने की कोशिश की है। प्रारंभ में, मैं (यह केवल एक धारा है, लेकिन कोड दूसरी धारा उत्पन्न करने के लिए समान है) इस तरह से बचे हुए चाबी के लिए दो खाली स्ट्रीम को बना:

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context 
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>(); 
q.add(empty_rdd); 
JavaDStream<String> empty_dstream = jssc.queueStream(q); 
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() { 
           @Override 
           public scala.Tuple2<String, String> call(String s) { 
            return new scala.Tuple2(s, s); 
           } 
           }); 

बाद में, इस खाली धारा एकीकृत है (यानी, संघ()) धारा 1 के साथ और अंत में, शामिल होने के बाद, मैं स्ट्रीम 1 से बचे हुए कुंजी जोड़ता हूं और विंडो() कॉल करता हूं। धारा 2 के साथ भी यही होता है।

समस्या यह है कि बाएं_keys_s1 और left_keys_s2 उत्पन्न करने वाले ऑपरेशन क्रियाओं के बिना परिवर्तन होते हैं, जिसका अर्थ है कि स्पार्क कोई आरडीडी प्रवाह ग्राफ नहीं बनाता है, इसलिए, उन्हें कभी निष्पादित नहीं किया जाता है। जो मुझे अभी मिल रहा है वह एक ऐसा हिस्सा है जो केवल उन अभिलेखों को आउटपुट करता है जिनकी चाबियां धारा 1 और स्ट्रीम 2 में एक ही समय अंतराल में होती हैं।

तुम लोगों स्पार्क के साथ सही ढंग से इस लागू करने के लिए कोई सुझाव है?

धन्यवाद, मार्को

उत्तर

1

यह एक RDD जहाँ हम उन मूल्यों को आयोजित कर रहे हैं के लिए एक संदर्भ रखकर अगले करने के लिए एक बैच से ले जाने से अधिक करने के लिए मूल्यों संभव हो जाना चाहिए।

queueDStream का उपयोग करके धाराओं को मर्ज करने का प्रयास न करें, इसके बजाय एक उत्परिवर्तनीय आरडीडी संदर्भ घोषित करें जो प्रत्येक स्ट्रीमिंग अंतराल पर अपडेट हो सकता है।

इस स्ट्रीमिंग का काम में, हम एक RDD 100 पूर्णांकों carring के साथ शुरू:

यह एक उदाहरण है। प्रत्येक अंतराल, 10 यादृच्छिक संख्या उत्पन्न होते हैं और उन प्रारंभिक 100 पूर्णांकों के लिए substracted कर रहे हैं। यह प्रक्रिया तब तक जारी है जब तक कि 100 तत्वों के साथ प्रारंभिक आरडीडी खाली न हो। यह उदाहरण दिखाता है कि तत्वों को एक अंतराल से अगले तक कैसे ले जाएं।

Removing 100 ints by generating random numbers

मुझे आशा है कि यह आप पूरा USECASE को लागू करने के लिए पर्याप्त मार्गदर्शन देता है:

import scala.util.Random 
    import org.apache.spark.streaming.dstream._ 

    val ssc = new StreamingContext(sparkContext, Seconds(2)) 

    var targetInts:RDD[Int] = sc.parallelize(0 until 100) 

    var loops = 0 

    // we create an rdd of functions that generate random data. 
    // evaluating this RDD at each interval will generate new random data points. 
    val randomDataRdd = sc.parallelize(1 to 10).map(_ =>() => Random.nextInt(100)) 

    val dstream = new ConstantInputDStream(ssc, randomDataRdd) 

    // create values from the random func rdd 

    dataDStream.foreachRDD{rdd => 
         loops += 1 
         targetInts = targetInts.subtract(rdd) 
         if (targetInts.isEmpty) {println(loops); ssc.stop(false)} 
         } 


    ssc.start() 

इस उदाहरण चल रहा है और targetInts.count के खिलाफ loops की साजिश रचने निम्नलिखित चार्ट देता है।

संबंधित मुद्दे