2016-02-29 2 views
5

मेरे पास डेटा की लाखों पंक्तियां हैं। स्पार्क स्ट्रीमिंग का उपयोग कर एक सप्ताह या एक दिन के भीतर इन सभी का विश्लेषण करना संभव है? डेटा राशि के संदर्भ में स्ट्रीमिंग की सीमा क्या है? मुझे यकीन नहीं है कि ऊपरी सीमा क्या है और जब मुझे उन्हें अपने डेटाबेस में रखना चाहिए क्योंकि स्ट्रीम शायद उन्हें संभाल नहीं सकता है। मेरे पास अलग-अलग समय विंडोज़ 1,3, 6 घंटे इत्यादि भी हैं जहां मैं डेटा को अलग करने के लिए विंडो ऑपरेशंस का उपयोग करता हूं। एक समय मेंडेटा राशि के मामले में स्ट्रीमिंग को चमकाने की सीमा क्या है?

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc,300) 
sqlContext = SQLContext(sc) 
channels = sc.cassandraTable("abc","channels") 
topic = 'abc.crawled_articles' 
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"} 

category = 'abc.crawled_article' 
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams) 
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x)) 


article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) 
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x)) 

#axes topic integration the article and the axes 
axes_topic = 'abc.crawled_axes' 
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams) 
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']})) 
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint() 

#join 
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60))) 
statistics.transform(joinstream).pprint() 

ssc.start() # Start the computation ssc.awaitTermination() 
ssc.awaitTermination() 
+1

यहां कई प्रश्न हैं, यदि आप स्पष्ट रूप से उन्हें अलग करते हैं तो यह जवाब देने में सहायता करेगा। साथ ही, यदि आप इस कोड को चित्रित करने के लिए पर्याप्त छोटे नमूने में शामिल कोड को कम करते हैं तो यह सहायक होगा – etov

उत्तर

1

एक::

नीचे मेरी कोड प्राप्त करें

  • यह [समय की एक निश्चित राशि] के भीतर [पंक्तियों की कुछ बड़ी संख्या] का विश्लेषण करना संभव है?

आम तौर पर, हाँ - स्पार्क आप कई मशीनों के लिए बाहर पैमाने पर करने, इसलिए सिद्धांत रूप में आप और एक बड़े क्लस्टर शुरू करने के लिए सक्षम होना चाहिए अपेक्षाकृत कम समय में डेटा के बहुत सारे की कमी (यह मानते हुए हम घंटों बात कर रहे हैं या अनुमति देता है दिन, सेकंड या उससे कम नहीं, जो ओवरहेड के कारण समस्याग्रस्त हो सकता है)।

विशेष रूप से, लाखों रिकॉर्डों पर आपके प्रश्नों में दिखाए गए प्रसंस्करण की तरह मुझे उचित समय में व्यवहार्य लगता है (यानी एक बेहद बड़े क्लस्टर का उपयोग किए बिना)।

  • डेटा की मात्रा के मामले में स्पार्क स्ट्रीमिंग की सीमा क्या है?

मुझे नहीं पता, लेकिन आपको इसे पाने में कठिनाई होगी। बहुत बड़े तैनाती के उदाहरण हैं, उदाहरण के लिए ebay में ("दैनिक 30TB औसत से अधिक मीट्रिक")। इसके अलावा, FAQ देखें, जो 8000 मशीनों के क्लस्टर का उल्लेख करता है और डेटा के प्रसंस्करण पीबी का उल्लेख करता है।

  • परिणाम [किसी प्रकार के भंडारण] में कब लिखा जाना चाहिए?

स्पार्क स्ट्रीमिंग का basic model के अनुसार, डेटा सूक्ष्म बैचों में संसाधित किया जाता है। यदि आपका डेटा वास्तव में एक धारा है (यानी कोई निश्चित अंत नहीं है), तो सबसे सरल दृष्टिकोण प्रत्येक आरडीडी (यानी, माइक्रोबैच) के प्रसंस्करण परिणामों को स्टोर करना होगा।

यदि आपका डेटा स्ट्रीम नहीं है, उदा। आप समय-समय पर स्थैतिक फ़ाइलों का एक समूह संसाधित कर रहे हैं, आपको शायद स्ट्रीम भाग छोड़ने पर विचार करना चाहिए (उदाहरण के लिए केवल बैच प्रोसेसर के रूप में स्पार्क का उपयोग करना)।

चूंकि आपका प्रश्न कुछ घंटों के विंडो आकार का उल्लेख करता है, मुझे संदेह है कि आप बैच विकल्प पर विचार करना चाहेंगे।

  • मैं अलग-अलग समय में एक ही डेटा को कैसे संसाधित कर सकता हूं?

आप स्पार्क स्ट्रीमिंग का उपयोग कर रहे हैं, तो आप (जैसे mapWithState का प्रयोग करके) कई राज्यों बनाये रख सकता है - हर बार खिड़की के लिए एक।

एक और विचार (कोड में सरल, ओप के मामले में अधिक जटिल) - आप एक ही स्ट्रीम से पढ़ने, अपनी स्वयं की खिड़की के साथ कई क्लस्टर शुरू कर सकते हैं।

यदि आप बैच-प्रोसेसिंग हैं, तो आप अलग-अलग समय विंडोज़ के साथ एक ही ऑपरेशन को कई बार चला सकते हैं, उदा। कई विंडो आकारों के साथ reduceByWindow

संबंधित मुद्दे