5

select कथन प्रत्येक बैच मुद्रित क्यों किया जाता है लेकिन hello world केवल एक बार?स्ट्रक्चरर्ड स्ट्रीमिंग में केवल एक बार साइड इफेक्ट्स (println) क्यों बदलता है?

import org.apache.spark.sql.types._ 
val schema = StructType(
    StructField("id", LongType, nullable = false) :: 
    StructField("name", StringType, nullable = false) :: 
    StructField("score", DoubleType, nullable = false) :: Nil) 

val in: DataFrame = sparkSession.readStream 
.schema(schema) 
.format("csv") 
.option("header", false) 
.option("maxFilesPerTrigger", 1) 
.option("delimiter", ";") 
.load("s3://xxxxxxxx") 

val input: DataFrame = in.select("*") 
.transform { ds => 
    println("hello world") // <-- Why is this printed out once? 
    ds 
} 

import org.apache.spark.sql.streaming.StreamingQuery 
val query: StreamingQuery = input.writeStream 
    .format("console") 
    .start 

उत्तर

6

स्पार्क 2.1.0-SNAPSHOT यहाँ (आज का निर्माण किया है), लेकिन मेरा मानना ​​है कि यह 2.0 और अब के बीच नहीं बदला।

$ ./bin/spark-submit --version 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT 
     /_/ 

Branch master 
Compiled by user jacek on 2016-09-30T07:08:39Z 
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a 
Url https://github.com/apache/spark.git 
Type --help for more information. 

स्पार्क के Structured Streaming में, अपने स्ट्रीमिंग का आवेदन एक मात्र इनपुट डेटा स्रोतों से ही शारीरिक क्वेरी योजना लागू करने के लिए चाल है।

कृपया ध्यान दें कि भौतिक क्वेरी योजना आपके Dataset (और जितना अधिक मैं स्पार्क एसक्यूएल के साथ हूं, उतना ही मुझे प्रश्न और डेटासेट के बीच कोई अंतर नहीं दिखता है - वे इन दिनों बस अदला-बदले में हैं)।

जब आप एक संरचित क्वेरी का वर्णन करते हैं (भले ही यह एक बंद या स्ट्रीमिंग क्वेरी होने जा रहा हो) यह पार्सिंग, विश्लेषण, अनुकूलन और आखिरकार भौतिक योजना का उत्पादन करने के 4 चरणों के माध्यम से जाता है। आप explain(extended = true) विधि का उपयोग कर इसकी समीक्षा कर सकते हैं।

scala> input.explain(extended = true) 
== Parsed Logical Plan == 
StreamingRelation DataSource([email protected],json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] 

== Analyzed Logical Plan == 
id: bigint, name: string, score: double 
StreamingRelation DataSource([email protected],json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] 

== Optimized Logical Plan == 
StreamingRelation DataSource([email protected],json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] 

== Physical Plan == 
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17] 

चरणों आलसी और केवल एक बार निष्पादित कर रहे हैं।

एक बार जब आप शारीरिक योजना प्राप्त कर लेते हैं, तो चरणों को फिर से निष्पादित नहीं किया जाएगा। आपकी Dataset पाइपलाइन पहले से ही गणना की गई है और केवल गायब टुकड़ा पाइप के माध्यम से बहने वाला डेटा है।

यही कारण है कि आप केवल एक बार "हैलो वर्ल्ड" देखते हैं - जब स्ट्रीमिंग क्वेरी योजना को भौतिक योजना बनाने के लिए "निष्पादित" किया गया था। इसे एक बार निष्पादित किया गया था और स्रोत Dataset (और केवल Dataset) को संसाधित करने के लिए अनुकूलित किया गया था, इसलिए किसी भी दुष्प्रभाव पहले ही ट्रिगर किए गए थे)।

एक दिलचस्प मामला। इसे यहां लाने के लिए बहुत कुछ है!

संबंधित मुद्दे