2016-07-22 11 views
5

मैं स्पार्क स्ट्रीमिंग का उपयोग कर जेएमएस कतार (क्यूपीआईडी) पर प्राप्त कुछ एक्सएमएल डेटा को संसाधित करने की कोशिश कर रहा हूं। एक्सएमएल को डीस्ट्रीम के रूप में प्राप्त करने के बाद मैं उन्हें डेटाफ्रेम में परिवर्तित कर देता हूं ताकि मैं पहले से लोड किए गए डेटाफ्रेम के रूप में उनके कुछ स्थिर डेटा के साथ उनसे जुड़ सकूं। लेकिन डीस्ट्रीम पर फोरैच Rdd विधि के लिए एपीआई दस्तावेज के अनुसार: इसे ड्राइवर पर निष्पादित किया जाता है, तो इसका मतलब यह है कि सभी प्रसंस्करण तर्क केवल चालक पर चलेंगे और श्रमिक/निष्पादकों को वितरित नहीं होंगे।चालक पर निष्पादित किया गया foreachRDD है?

API दस्तावेज़

foreachRDD(func)

एक समारोह लागू होता है, समारोह, प्रत्येक RDD के लिए धारा से उत्पन्न है कि सबसे सामान्य उत्पादन ऑपरेटर। यह फ़ंक्शन प्रत्येक आरडीडी में बाहरी सिस्टम में डेटा को धक्का देना चाहिए, जैसे फ़ाइलों को आरडीडी सहेजना, या नेटवर्क पर इसे नेटवर्क पर लिखना। नोट कि फंक्शन func स्ट्रीमिंग अनुप्रयोग चलाने वाले ड्राइवर प्रक्रिया में निष्पादित किया गया है, और आमतौर पर इसमें RDD क्रियाएं होंगी जो स्ट्रीमिंग आरडीडी की गणना को मजबूर करेगी।

उत्तर

6

तो इसका मतलब यह है सभी संसाधन तर्क पर ही चालक चलेंगे और न श्रमिकों/निष्पादकों को वितरित किया जाता है।

नहीं है, समारोह में ही ड्राइवर पर रन है, लेकिन भूल नहीं है कि यह एक RDD पर चल रही है। RDD, , map, filter आदि पर उपयोग किए जाने वाले आंतरिक फ़ंक्शंस अभी भी कार्यकर्ता नोड्स पर चलेंगे। यह नहीं होगा क्योंकि सभी डेटा ड्राइवर पर नेटवर्क पर वापस भेजे जाएंगे, जब तक कि आप collect जैसी विधियों को कॉल न करें।

+0

खैर, यह तो भ्रामक है , "नहीं, यह फ़ंक्शन स्वयं चालक पर चलता है, लेकिन यह न भूलें कि यह आरडीडी पर चल रहा है", मान लें कि 'rdd.foreachRDD (func)' कहा जाता है, और यह 'func' फ़ंक्शन डेटा को रेडिस के माध्यम से लिखता है वैश्विक var 'redis_client', जिसका अर्थ है 'func'' redis_client' को संदर्भित करता है, इसलिए प्रश्न है: 'foreachRDD' कॉल में कोई अपवाद उठाया जाएगा, क्योंकि' redis_client' serializable नहीं है। – avocado

+0

@loganecolss मैं सहमत हूं, निष्पादन के अर्थशास्त्र जटिल हैं। 'rdd.foreachRDD' निष्पादकों पर 'func' निष्पादित करता है। यदि बंद करने के माध्यम से 'func' * कैप्चर *' redis_client' कैप्चर करता है, तो आपको 'टास्कनॉटसेरियलज़ेबल' अपवाद मिलेगा। यदि 'func_client' का उदाहरण 'func' के अंदर आवंटित किया गया है, तो आप ठीक होंगे। –

+0

@YuvalItzchakov आपकी टिप्पणी का उद्धरण: _rdd.foreachRDD निष्पादकों पर func निष्पादित करता है ._ [डॉक्स] से [https://spark.apache.org/docs/latest/streaming-programming-guide.html#output- ऑपरेशंस-ऑन-डीस्ट्रीम), 'foreachRDD' ** चालक ** पर 'func' निष्पादित करता है। 'foreach',' foreachPartition' ** निष्पादक ** पर चलता है हालांकि। –

1

यह स्पष्ट करने के लिए यदि आप निम्नलिखित चलाने के लिए, आप ड्राइवर की stdout पर "बंदर" देखेंगे,:,

myDStream.foreachRDD { rdd => 
    println("monkey") 
} 

आप निम्नलिखित चलाते हैं, तो ड्राइवर का stdout पर "बंदर" देखेंगे और फिल्टर काम जो कुछ निष्पादकों rdd पर किया जाएगा भर में वितरित किया जाता है:

myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
} 

के सरलीकरण जोड़ते हैं कि myDStream लें वाई को कभी भी एक आरडीडी प्राप्त होता है, और यह आरडीडी विभाजन के एक सेट में फैल गया है जिसे हम PartitionSetA पर कॉल करेंगे जो MachineSetB पर मौजूद है जहां ExecutorSetC चल रहे हैं।यदि आप निम्न चलाते हैं, तो आपको ड्राइवर के स्टडआउट पर "बंदर" दिखाई देगा, आपको ExecutorSetC में सभी निष्पादकों के स्टडआउट पर "कछुए" दिखाई देगा ("कछुआ" प्रत्येक विभाजन के लिए एक बार दिखाई देगा - कई विभाजन मशीन पर हो सकते हैं जहां एक प्रबंधक चल रहा है), और दोनों फिल्टर और इसके अलावा आपरेशन के काम ExecutorSetC भर में किये जायेंगे:

myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
    rdd.foreachPartition { partition => 
    println("turtle") 
    val x = 1 + 1 
    } 
} 

एक और नोट करने के लिए बात यह है कि निम्नलिखित कोड में, y हो जाएंगे भर में भेजा जा रहा है प्रत्येक rdd के लिए ड्राइवर से ExecutorSetC पर नेटवर्क:

val y = 2 
myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
    rdd.foreachPartition { partition => 
    println("turtle") 
    val x = 1 + 1 
    val z = x + y 
    } 
} 

इस ओवरहेड से बचने के लिए, आप प्रसारण चर का उपयोग कर सकते हैं, जो ड्राइवर से मूल्य को केवल एक बार निष्पादकों को भेजता है। उदाहरण के लिए:

val y = 2 
val broadcastY = sc.broadcast(y) 
myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
    rdd.foreachPartition { partition => 
    println("turtle") 
    val x = 1 + 1 
    val z = x + broadcastY.value 
    } 
} 

इस तरह की वस्तुओं है कि आसानी से एक बार instantiated serializable नहीं हैं, आप निम्न ब्लॉग पोस्ट देख सकते हैं के रूप में प्रसारण चर, के रूप में और अधिक जटिल बातों पर भेजने के लिए: https://allegro.tech/2015/08/spark-kafka-integration.html

संबंधित मुद्दे