2016-11-23 12 views
5

मैं सोच रहा था कि स्पार्क mapPartitions कार्यक्षमता बनाम क्षणिक आलसी मूल्य का उपयोग करने के विभिन्न क्या हैं।
चूंकि प्रत्येक विभाजन मूल रूप से एक अलग नोड पर चल रहा है क्योंकि क्षणिक आलसी वैल का एक उदाहरण प्रत्येक नोड (इसे किसी ऑब्जेक्ट में मानते हुए) पर बनाया जाएगा।स्पार्क नक्शा पार्टिशन बनाम क्षणिक आलसी मूल्य

उदाहरण के लिए:

class NotSerializable(v: Int) { 
    def foo(a: Int) = ??? 
} 

object OnePerPartition { 
    @transient lazy val obj: NotSerializable = new NotSerializable(10) 
} 

object Test extends App{ 
    val conf = new SparkConf().setMaster("local[2]").setAppName("test") 
    val sc = new SparkContext(conf) 

    val rdd: RDD[Int] = sc.parallelize(1 to 100000) 

    rdd.map(OnePerPartition.obj.foo) 

    // ---------- VS ---------- 

    rdd.mapPartitions(itr => { 
     val obj = new NotSerializable(10) 
     itr.map(obj.foo) 
    }) 
} 

एक पूछ सकते हैं क्यों तुम भी यह चाहते हैं ...
मैं किसी भी सामान्य संग्रह कार्यान्वयन (RDD पर मेरे तर्क को चलाने के लिए एक सामान्य कंटेनर धारणा बनाना चाहेंगे, List , scalding pipe, आदि)
उनमें से सभी को "मानचित्र" की धारणा है, लेकिन mapPartitionspark के लिए अद्वितीय है।

उत्तर

2

सबसे पहले आपको transientlazy की आवश्यकता नहीं है। object आवरण का उपयोग करते हुए इस काम बनाने के लिए पर्याप्त है और आप वास्तव में के रूप में लिख सकते हैं:

object OnePerExecutor { 
    val obj: NotSerializable = new NotSerializable(10) 
} 

वहाँ वस्तु आवरण और mapPartitions अंदर NotSerializable आरंभ में एक बुनियादी अंतर है। यह:

rdd.mapPartitions(iter => { 
    val ns = NotSerializable(1) 
    ??? 
}) 

प्रति विभाजन एक NotSerializable उदाहरण बनाता है।

दूसरी ओर से ऑब्जेक्ट रैपर, प्रत्येक निष्पादक JVM के लिए एक NotSerializable उदाहरण बनाता है। नतीजतन यह उदाहरण:

  • एकाधिक विभाजनों को संसाधित करने के लिए उपयोग किया जा सकता है।
  • एकाधिक निष्पादक धागे द्वारा एक साथ उपयोग किया जा सकता है।
  • जीवनकाल से अधिक फ़ंक्शन कॉल है जहां इसका उपयोग किया जाता है।

इसका मतलब है कि यह थ्रेड सुरक्षित होना चाहिए और किसी विधि विधि को दुष्प्रभाव मुक्त होना चाहिए।

संबंधित मुद्दे