स्ट्रीमिंग मैं अपाचे स्पार्क 1.6.1 स्ट्रीमिंग एक जावा अनुप्रयोग है कि दो की/मान डेटा धाराओं जुड़ जाता है और HDFS के उत्पादन लिखते हैं लिखने के लिए उपयोग कर रहा हूँ ले जाने के लिए। दो डेटा स्ट्रीम में के/वी तार होते हैं और टेक्स्टफाइलस्ट्रीम() का उपयोग कर स्पार्क में एचडीएफएस से समय-समय पर निगमित होते हैं।कैसे स्पार्क में कई बैच के अंतराल पर डेटा धाराओं
दो डेटा धाराओं सिंक्रनाइज़ नहीं कर रहे हैं, जिसका मतलब है कि कुछ सुझाव दिए गए उस समय t0 पर stream1 में हैं समय t1, या इसके विपरीत पर stream2 में दिखाई दे सकते हैं। इसलिए, मेरा लक्ष्य दो धाराओं में शामिल होने और गणना "बचे हुए" कुंजी है, जो अगले बैच के अंतराल में शामिल होने के ऑपरेशन के लिए विचार किया जाना चाहिए है।
बेहतर यह स्पष्ट करने के लिए, निम्नलिखित कलन विधि को देखो:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
मैं स्पार्क असफल स्ट्रीमिंग के साथ इस एल्गोरिथ्म को लागू करने की कोशिश की है। प्रारंभ में, मैं (यह केवल एक धारा है, लेकिन कोड दूसरी धारा उत्पन्न करने के लिए समान है) इस तरह से बचे हुए चाबी के लिए दो खाली स्ट्रीम को बना:
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
बाद में, इस खाली धारा एकीकृत है (यानी, संघ()) धारा 1 के साथ और अंत में, शामिल होने के बाद, मैं स्ट्रीम 1 से बचे हुए कुंजी जोड़ता हूं और विंडो() कॉल करता हूं। धारा 2 के साथ भी यही होता है।
समस्या यह है कि बाएं_keys_s1 और left_keys_s2 उत्पन्न करने वाले ऑपरेशन क्रियाओं के बिना परिवर्तन होते हैं, जिसका अर्थ है कि स्पार्क कोई आरडीडी प्रवाह ग्राफ नहीं बनाता है, इसलिए, उन्हें कभी निष्पादित नहीं किया जाता है। जो मुझे अभी मिल रहा है वह एक ऐसा हिस्सा है जो केवल उन अभिलेखों को आउटपुट करता है जिनकी चाबियां धारा 1 और स्ट्रीम 2 में एक ही समय अंतराल में होती हैं।
तुम लोगों स्पार्क के साथ सही ढंग से इस लागू करने के लिए कोई सुझाव है?
धन्यवाद, मार्को