मेरे पास डेटा की लाखों पंक्तियां हैं। स्पार्क स्ट्रीमिंग का उपयोग कर एक सप्ताह या एक दिन के भीतर इन सभी का विश्लेषण करना संभव है? डेटा राशि के संदर्भ में स्ट्रीमिंग की सीमा क्या है? मुझे यकीन नहीं है कि ऊपरी सीमा क्या है और जब मुझे उन्हें अपने डेटाबेस में रखना चाहिए क्योंकि स्ट्रीम शायद उन्हें संभाल नहीं सकता है। मेरे पास अलग-अलग समय विंडोज़ 1,3, 6 घंटे इत्यादि भी हैं जहां मैं डेटा को अलग करने के लिए विंडो ऑपरेशंस का उपयोग करता हूं। एक समय मेंडेटा राशि के मामले में स्ट्रीमिंग को चमकाने की सीमा क्या है?
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,300)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("abc","channels")
topic = 'abc.crawled_articles'
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"}
category = 'abc.crawled_article'
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams)
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x))
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
#axes topic integration the article and the axes
axes_topic = 'abc.crawled_axes'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']}))
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint()
#join
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60)))
statistics.transform(joinstream).pprint()
ssc.start() # Start the computation ssc.awaitTermination()
ssc.awaitTermination()
यहां कई प्रश्न हैं, यदि आप स्पष्ट रूप से उन्हें अलग करते हैं तो यह जवाब देने में सहायता करेगा। साथ ही, यदि आप इस कोड को चित्रित करने के लिए पर्याप्त छोटे नमूने में शामिल कोड को कम करते हैं तो यह सहायक होगा – etov