2017-08-10 23 views
6

में अलग स्ट्रीमिंग प्रश्नों को निष्पादित करना मैं दो अलग-अलग विंडो के साथ स्ट्रीम को एकत्र करने और इसे कंसोल में प्रिंट करने की कोशिश कर रहा हूं। हालांकि केवल पहली स्ट्रीमिंग क्वेरी मुद्रित की जा रही है। tenSecsQ कंसोल में मुद्रित नहीं है।स्पार्क संरचित स्ट्रीमिंग

SparkSession spark = SparkSession 
    .builder() 
    .appName("JavaStructuredNetworkWordCountWindowed") 
    .config("spark.master", "local[*]") 
    .getOrCreate(); 

Dataset<Row> lines = spark 
    .readStream() 
    .format("socket") 
    .option("host", host) 
    .option("port", port) 
    .option("includeTimestamp", true) 
    .load(); 

Dataset<Row> words = lines 
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) 
    .toDF("word", "timestamp"); 

// 5 second window 
Dataset<Row> fiveSecs = words 
    .groupBy(
     functions.window(words.col("timestamp"), "5 seconds"), 
     words.col("word") 
    ).count().orderBy("window"); 

// 10 second window 
Dataset<Row> tenSecs = words 
    .groupBy(
      functions.window(words.col("timestamp"), "10 seconds"), 
      words.col("word") 
    ).count().orderBy("window"); 

5 और 10s समेकित धाराओं के लिए ट्रिगर स्ट्रीमिंग क्वेरी। 10 एस स्ट्रीम के लिए आउटपुट मुद्रित नहीं है। केवल 5 एस को कंसोल

// Start writeStream() for 5s window 
StreamingQuery fiveSecQ = fiveSecs.writeStream() 
    .queryName("5_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

// Start writeStream() for 10s window 
StreamingQuery tenSecsQ = tenSecs.writeStream() 
    .queryName("10_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

tenSecsQ.awaitTermination(); 
+0

असल में, मुझे नहीं पता कि सॉकेट स्ट्रीम कैसे काम करती है लेकिन मेरे लिए लगता है कि आपकी पहली स्पार्क स्ट्रीम सॉकेट स्ट्रीम से सभी डेटा पढ़ती है और दूसरे के लिए कुछ भी नहीं रहता है। –

उत्तर

5

में मुद्रित किया गया है, मैं इस प्रश्न की जांच कर रहा हूं।

सारांश: संरचित स्ट्रीमिंग में प्रत्येक क्वेरी source डेटा का उपभोग करती है। सॉकेट स्रोत परिभाषित प्रत्येक क्वेरी के लिए एक नया कनेक्शन बनाता है। इस मामले में देखा गया व्यवहार इसलिए है क्योंकि nc केवल पहले कनेक्शन में इनपुट डेटा प्रदान कर रहा है।

अब से, सॉकेट कनेक्शन पर एकाधिक एकत्रीकरण को परिभाषित करना संभव नहीं है जब तक कि हम यह सुनिश्चित न कर सकें कि कनेक्टेड सॉकेट स्रोत प्रत्येक कनेक्शन को एक ही डेटा को खुलासा करता है।


मैंने स्पार्क मेलिंग सूची पर इस प्रश्न पर चर्चा की। डाटाबेस डेवलपर शिक्सियोन झू ने उत्तर दिया:

स्पार्क प्रत्येक क्वेरी के लिए एक कनेक्शन बनाता है। आपके द्वारा देखा गया व्यवहार इसलिए है क्योंकि "nc -lk" कैसे काम करता है। यदि आप टीसीपी कनेक्शन की जांच के लिए netstat का उपयोग करते हैं, तो आप देखेंगे कि दो प्रश्न शुरू करते समय दो कनेक्शन हैं। हालांकि, "एनसी" इनपुट केवल एक कनेक्शन के लिए आगे।

मैं एक छोटा सा प्रयोग को परिभाषित करते हुए इस व्यवहार सत्यापित: पहले, मैं एक SimpleTCPWordServer कि प्रत्येक कनेक्शन के लिए यादृच्छिक शब्दों उद्धार खुला और एक बुनियादी संरचित स्ट्रीमिंग काम है कि दो प्रश्नों वाणी बनाया।

val lines = spark 
    .readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", "9999") 
    .option("includeTimestamp", true) 
    .load() 

val q1 = lines.writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("5 seconds")) 
    .start() 

val q2 = lines.withColumn("foo", lit("foo")).writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("7 seconds")) 
    .start() 

StructuredStreaming केवल एक धारा का उपभोग होता है, तो हम दोनों प्रश्नों के द्वारा दिया एक ही शब्द देखना चाहिए: उन दोनों के बीच फर्क सिर्फ इतना है कि 2 क्वेरी एक अतिरिक्त निरंतर स्तंभ को परिभाषित करता है इसके उत्पादन अंतर करने के लिए है। यदि प्रत्येक क्वेरी एक अलग धारा का उपभोग करती है, तो हमारे पास प्रत्येक क्वेरी द्वारा रिपोर्ट किए गए अलग-अलग शब्द होंगे।

यह मनाया उत्पादन होता है:

------------------------------------------- 
Batch: 0 
------------------------------------------- 
+--------+-------------------+ 
| value|   timestamp| 
+--------+-------------------+ 
|champion|2017-08-14 13:54:51| 
+--------+-------------------+ 

+------+-------------------+---+ 
| value|   timestamp|foo| 
+------+-------------------+---+ 
|belong|2017-08-14 13:54:51|foo| 
+------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+-------+-------------------+---+ 
| value|   timestamp|foo| 
+-------+-------------------+---+ 
| agenda|2017-08-14 13:54:52|foo| 
|ceiling|2017-08-14 13:54:52|foo| 
| bear|2017-08-14 13:54:53|foo| 
+-------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+----------+-------------------+ 
|  value|   timestamp| 
+----------+-------------------+ 
| breath|2017-08-14 13:54:52| 
|anticipate|2017-08-14 13:54:52| 
| amazing|2017-08-14 13:54:52| 
| bottle|2017-08-14 13:54:53| 
| calculate|2017-08-14 13:54:53| 
|  asset|2017-08-14 13:54:54| 
|  cell|2017-08-14 13:54:54| 
+----------+-------------------+ 

हम स्पष्ट रूप से देख सकते हैं कि प्रत्येक क्वेरी के लिए धाराओं अलग हैं। ऐसा लगता है कि socket source द्वारा दिए गए डेटा पर एकाधिक समेकन को परिभाषित करना संभव नहीं है जब तक कि हम गारंटी न दें कि टीसीपी बैकएंड सर्वर प्रत्येक खुले कनेक्शन में बिल्कुल वही डेटा प्रदान करता है।

संबंधित मुद्दे