2017-06-07 9 views
8

में वॉटरमार्केड एकत्रीकरण क्वेरी के लिए खाली आउटपुट मैं स्पार्क 2.2.0-आरसी 1 का उपयोग करता हूं।परिशिष्ट मोड

मैं एक काफ्का topic जो मैं एक चल वॉटरमार्क एकत्रीकरण की क्वेरी रहा हूँ, एक 1 minute वॉटरमार्क के साथ, append उत्पादन मोड के साथ console के लिए बाहर दे रही है मिल गया है।

import org.apache.spark.sql.types._ 
val schema = StructType(StructField("time", TimestampType) :: Nil) 
val q = spark. 
    readStream. 
    format("kafka"). 
    option("kafka.bootstrap.servers", "localhost:9092"). 
    option("startingOffsets", "earliest"). 
    option("subscribe", "topic"). 
    load. 
    select(from_json(col("value").cast("string"), schema).as("value")) 
    select("value.*"). 
    withWatermark("time", "1 minute"). 
    groupBy("time"). 
    count. 
    writeStream. 
    outputMode("append"). 
    format("console"). 
    start 

मैं काफ्का topic में निम्न डेटा धक्का कर रहा हूँ:

{"time":"2017-06-07 10:01:00.000"} 
{"time":"2017-06-07 10:02:00.000"} 
{"time":"2017-06-07 10:03:00.000"} 
{"time":"2017-06-07 10:04:00.000"} 
{"time":"2017-06-07 10:05:00.000"} 

और मैं निम्नलिखित उत्पादन हो रही है:

scala> ------------------------------------------- 
Batch: 0 
------------------------------------------- 
+----+-----+                  
|time|count| 
+----+-----+ 
+----+-----+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+----+-----+                  
|time|count| 
+----+-----+ 
+----+-----+ 

------------------------------------------- 
Batch: 2 
------------------------------------------- 
+----+-----+                  
|time|count| 
+----+-----+ 
+----+-----+ 

------------------------------------------- 
Batch: 3 
------------------------------------------- 
+----+-----+                  
|time|count| 
+----+-----+ 
+----+-----+ 

------------------------------------------- 
Batch: 4 
------------------------------------------- 
+----+-----+                  
|time|count| 
+----+-----+ 
+----+-----+ 

इस अपेक्षित व्यवहार है?

+0

स्पार्क 2.1 के साथ एक ही समस्या है। मैं डिस्क से एक स्ट्रीम पढ़ रहा हूं, और '.withWatermark' और groupBy (विंडो (...)) का उपयोग कर रहा हूं - कोई डेटा नहीं निकाला जा रहा है। वॉटरमार्किंग डेटा के बिना सामान्य रूप से संसाधित किया जा रहा है। –

+0

ऐसा लगता है कि यह एक बग है ... https://issues.apache.org/jira/browse/SPARK-20065। – himanshuIIITian

+0

@ रायनराल क्या आप वॉटरमार्किंग के बिना "पूर्ण" मोड का उपयोग कर रहे हैं और वॉटरमार्किंग के साथ "संलग्न" मोड कर रहे हैं? – zsxwing

उत्तर

7

काफ्का को अधिक डेटा धक्का देना स्पार्क को कुछ आउटपुट करने के लिए ट्रिगर करना चाहिए। वर्तमान व्यवहार आंतरिक कार्यान्वयन की वजह से पूरी तरह से है।

जब आप कुछ डेटा दबाते हैं, तो StreamingQuery चलाने के लिए एक बैच उत्पन्न करेगा। जब यह बैच खत्म हो जाता है, तो यह इस बैच में अधिकतम ईवेंट समय याद रखेगा। फिर अगले बैच में, क्योंकि आप append मोड का उपयोग कर रहे हैं, स्ट्रीमिंगक्वायर स्टेटस्टोर से पुराने मानों को बेदखल करने और इसे आउटपुट करने के लिए अधिकतम ईवेंट समय और वॉटरमार्क का उपयोग करेगा। इसलिए आपको आउटपुट देखने के लिए कम से कम दो बैचों को उत्पन्न करने की ज़रूरत है।

+0

प्रतिक्रिया के लिए धन्यवाद! लेकिन मैंने 4 बैचों को जन्म दिया लेकिन अभी भी खाली आउटपुट मिला। मैंने अपने प्रश्न को सभी 4 बैचों के साथ अपडेट किया है। – himanshuIIITian

+0

मैं देखता हूं। एक और मुद्दा यह है कि आपके जेसन मान अमान्य हैं। यह '{" समय "होना चाहिए:" 2017-06-07 10: 01: 00.000 "}'। यदि आप "पूर्ण" मोड का उपयोग करते हैं, तो आप इसे केवल आउटपुट आउटपुट देखेंगे। – zsxwing

+0

जेसन मान को सही करने के बाद, मैं आउटपुट को 'पूर्ण' मोड में देख सकता हूं लेकिन 'एपेंड' मोड में नहीं। क्यूं कर ? – himanshuIIITian

2

यहाँ मेरी सर्वश्रेष्ठ अनुमान दिया है: के बाद वॉटरमार्क बीत चुका है

संलग्न मोड केवल डेटा आउटपुट (जैसे इस मामले में 1 मिनट बाद)। आपने ट्रिगर (उदा। .trigger(Trigger.ProcessingTime("10 seconds")) सेट नहीं किया है, इसलिए डिफ़ॉल्ट रूप से यह जितनी जल्दी हो सके बैच आउटपुट करता है। तो पहले मिनट के लिए आपके सभी बैचों खाली होना चाहिए, और एक मिनट के बाद पहले बैच में कुछ सामग्री होनी चाहिए।

एक और संभावना यह है कि आप groupBy(window("time", "[window duration]")) के बजाय groupBy("time") का उपयोग कर रहे हैं। मेरा मानना ​​है कि वॉटरमार्क का उपयोग समय खिड़कियों या मानचित्र ग्रुप विथस्टेट के साथ किया जाना है, इसलिए मैं इस मामले में बातचीत कैसे काम करता हूं।

+0

मुझे समय परिवर्तक द्वारा समूहबद्ध करने का प्रयास करते समय भी एक ही समस्या हो रही थी। लेकिन इसे विंडो (समय, ...) में बदलना मेरी समस्या को ठीक कर दिया – Paul

संबंधित मुद्दे