में अलग स्ट्रीमिंग प्रश्नों को निष्पादित करना मैं दो अलग-अलग विंडो के साथ स्ट्रीम को एकत्र करने और इसे कंसोल में प्रिंट करने की कोशिश कर रहा हूं। हालांकि केवल पहली स्ट्रीमिंग क्वेरी मुद्रित की जा रही है। tenSecsQ
कंसोल में मुद्रित नहीं है।स्पार्क संरचित स्ट्रीमिंग
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCountWindowed")
.config("spark.master", "local[*]")
.getOrCreate();
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load();
Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.toDF("word", "timestamp");
// 5 second window
Dataset<Row> fiveSecs = words
.groupBy(
functions.window(words.col("timestamp"), "5 seconds"),
words.col("word")
).count().orderBy("window");
// 10 second window
Dataset<Row> tenSecs = words
.groupBy(
functions.window(words.col("timestamp"), "10 seconds"),
words.col("word")
).count().orderBy("window");
5 और 10s समेकित धाराओं के लिए ट्रिगर स्ट्रीमिंग क्वेरी। 10 एस स्ट्रीम के लिए आउटपुट मुद्रित नहीं है। केवल 5 एस को कंसोल
// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
.queryName("5_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
.queryName("10_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
tenSecsQ.awaitTermination();
असल में, मुझे नहीं पता कि सॉकेट स्ट्रीम कैसे काम करती है लेकिन मेरे लिए लगता है कि आपकी पहली स्पार्क स्ट्रीम सॉकेट स्ट्रीम से सभी डेटा पढ़ती है और दूसरे के लिए कुछ भी नहीं रहता है। –