2016-12-30 13 views
7

स्पार्क दस्तावेज के अनुसार केवल आरडीडी क्रियाएं स्पार्क नौकरी को ट्रिगर कर सकती हैं और जब किसी पर कार्रवाई की जाती है तो परिवर्तनों का आलसी मूल्यांकन किया जाता है।सॉर्ट द्वारा परिवर्तन स्पार्क नौकरी क्यों ट्रिगर करता है?

मुझे लगता है कि sortBy परिवर्तन फ़ंक्शन तुरंत लागू होता है और इसे स्पार्कयूआई में नौकरी ट्रिगर के रूप में दिखाया जाता है। क्यूं कर?

उत्तर

3

sortBysortByKey का उपयोग करके कार्यान्वित किया गया है जो RangePartitioner (JVM) या विभाजन फ़ंक्शन (पायथन) पर निर्भर करता है। जब आप sortBy/sortByKey विभाजनकर्ता (विभाजन कार्य) को उत्सुकता से प्रारंभ करते हैं और विभाजन सीमाओं की गणना करने के लिए आरडीडी इनपुट नमूने प्रारंभ करते हैं। जो काम आप देखते हैं वह इस प्रक्रिया से मेल खाता है।

वास्तविक सॉर्टिंग केवल तभी किया जाता है जब आप नव निर्मित RDD या उसके वंशजों पर कोई कार्रवाई निष्पादित करते हैं।

1

स्पार्क दस्तावेज़ीकरण के अनुसार केवल क्रिया स्पार्क में नौकरी ट्रिगर करती है, जब उस पर कार्रवाई की जाती है तो परिवर्तनों का आलसी मूल्यांकन किया जाता है।

सामान्य में तुम सही हो, लेकिन जैसा कि आप सिर्फ अनुभव किया है, वहाँ कुछ अपवाद हैं और sortBy उन के बीच में है (zipWithIndex के साथ)।

वास्तव में, स्पार्क के जीआईआरए में इसकी सूचना मिली और विल फिक्स रिज़ॉल्यूशन के साथ बंद हुआ। SPARK-1021 sortByKey() launches a cluster job when it shouldn't देखें।

आप काम (और बाद में वेब UI में) चल रहा DAGScheduler प्रवेश के साथ सक्षम देख सकते हैं:

scala> sc.parallelize(0 to 8).sortBy(identity) 
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions 
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25) 
INFO DAGScheduler: Parents of final stage: List() 
INFO DAGScheduler: Missing parents: List() 
DEBUG DAGScheduler: submitStage(ResultStage 1) 
DEBUG DAGScheduler: missing: List() 
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents 
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1) 
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25) 
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4) 
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s 
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0 
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s 
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25 
संबंधित मुद्दे