15

स्पार्क 2.0.1 के साथ शुरू हो रहा है मुझे कुछ प्रश्न हैं। मैं प्रलेखन का एक बहुत पढ़ा लेकिन अभी तक पर्याप्त उत्तर नहीं मिल सके:स्पार्क 2.0 डेटासेट बनाम डेटाफ्रेम

  • क्या
    • df.select("foo")
    • df.select($"foo")
  • के बीच अंतर है मैं सही ढंग से समझ कर कि
    • myDataSet.map(foo.someVal) टाइपएफ़ और वाई है RDD में परिवर्तित नहीं होगा, लेकिन डेटासेट प्रतिनिधित्व/कोई अतिरिक्त ओवरहेड (2.0.0 के लिए प्रदर्शन के अनुसार प्रदर्शन)
  • अन्य सभी आदेश उदा। चयन करें, .. सिर्फ वाक्य रचनात्मक चीनी हैं। वे टाइपएफ़ नहीं हैं और इसके बजाय मानचित्र का उपयोग किया जा सकता है। मैप स्टेटमेंट के बिना मैं df.select("foo") टाइप-सुरक्षित कैसे कर सकता हूं?
    • मुझे नक्शा के बजाय यूडीएफ/यूएडीएफ का उपयोग क्यों करना चाहिए (मानते हुए नक्शा डेटासेट प्रतिनिधित्व में रहता है)?
+0

एक परियोजना है कि जबकि कुशल निष्पादन पथ पर रहने स्पार्क के लिए अधिक प्रकार सुरक्षा प्रदान करना है नहीं है ] (https://github.com/typelevel/frameless) –

उत्तर

11
  1. df.select("foo") और df.select($"foo") के बीच अंतर हस्ताक्षर है। पूर्व में कम से कम एक String, बाद में एक शून्य या अधिक Columns लगता है। इससे परे कोई व्यावहारिक अंतर नहीं है।
  2. myDataSet.map(foo.someVal) प्रकार की जाँच करता है, लेकिन रूप में किसी भी Dataset आपरेशन वस्तुओं की RDD, और DataFrame संचालन की तुलना का उपयोग करता है, वहाँ एक महत्वपूर्ण ओवरहेड है। को एक सरल उदाहरण पर एक नज़र लेते हैं:

    case class FooBar(foo: Int, bar: String) 
    val ds = Seq(FooBar(1, "x")).toDS 
    ds.map(_.foo).explain 
    
    == Physical Plan == 
    *SerializeFromObject [input[0, int, true] AS value#123] 
    +- *MapElements <function1>, obj#122: int 
        +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar 
         +- LocalTableScan [foo#117, bar#118] 
    

    आप देख सकते हैं इस कार्य योजना लागू करके सभी क्षेत्रों के लिए उपयोग की आवश्यकता है और DeserializeToObject गया है।

  3. नहीं। सामान्य अन्य विधियों में सिंटैक्टिक चीनी नहीं है और एक महत्वपूर्ण भिन्न निष्पादन योजना उत्पन्न करती है। उदाहरण के लिए:

    ds.select($"foo").explain 
    
    == Physical Plan == 
    LocalTableScan [foo#117] 
    

    योजना से पहले ही स्तंभ सीधे उपयोग कर सकते हैं पता चला की तुलना में। यह एपीआई की इतनी सीमा नहीं है बल्कि परिचालन अर्थशास्त्र में एक अंतर का परिणाम है।

  4. मैप स्टेटमेंट के बिना मैं df.select ("foo") टाइप-सुरक्षित कैसे कर सकता हूं?

    ऐसा कोई विकल्प नहीं है। जबकि टाइप कॉलम आप एक और स्थिर Dataset टाइप किया में स्थिर Dataset को बदलने के लिए अनुमति देते हैं:

    ds.select($"bar".as[Int]) 
    

    वहाँ सुरक्षित टाइप नहीं कर रहे हैं।टाइप सुरक्षित अनुकूलित ऑपरेशंस, like typed aggregations, लेकिन इस प्रयोगात्मक एपीआई को शामिल करने के कुछ अन्य प्रयास हैं।

  5. कारण है कि मैं एक नक्शा

    यह पूरी तरह से आप पर निर्भर है के बजाय एक यूडीएफ/UADF उपयोग करना चाहिए। स्पार्क में प्रत्येक वितरित डेटा संरचना अपने फायदे और नुकसान प्रदान करती है (उदाहरण के लिए देखें Spark UDAF with ArrayType as bufferSchema performance issues)।

व्यक्तिगत रूप से, मैं स्थिर Dataset टाइप किया पाते हैं होने के लिए कम से कम उपयोगी:

  • ही (Dataset[Row] के रूप में अनुकूलन की सीमा हालांकि वे भंडारण प्रारूप और कुछ कार्य योजना लागू करके यह अनुकूलन का हिस्सा प्रदान न करें कोड पीढ़ी या ऑफ-हीप स्टोरेज से पूरी तरह से लाभ नहीं होता है) और न ही DataFrame की सभी विश्लेषणात्मक क्षमताओं तक पहुंच।

  • टाइप किए गए परिवर्तन ब्लैक बॉक्स हैं, और ऑप्टिमाइज़र के लिए प्रभावी रूप से विश्लेषण बाधा बनाते हैं। उदाहरण के चयन के लिए (फिल्टर) नहीं किया जा टाइप किया परिवर्तन पर धक्का जा सकता है:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain 
    
    == Physical Plan == 
    *Filter (foo#133 = 1) 
    +- *Filter <function1>.apply 
        +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
         +- Exchange hashpartitioning(foo#133, 200) 
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
          +- LocalTableScan [foo#133, bar#134] 
    

    की तुलना में:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain 
    
    == Physical Plan == 
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
    +- Exchange hashpartitioning(foo#133, 200) 
        +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
         +- *Filter (foo#133 = 1) 
         +- LocalTableScan [foo#133, bar#134] 
    

    यह प्रभावों विधेय पुशडाउन या प्रक्षेपण पुशडाउन तरह की सुविधा है।

  • RDDs के रूप में लचीला नहीं है क्योंकि केवल छोटे प्रकार के प्रकारों को मूल रूप से समर्थित किया गया है।

  • Encoders के साथ "प्रकार सुरक्षा" विवाद योग्य है जब Datasetas विधि का उपयोग करके परिवर्तित किया जाता है। चूंकि डेटा आकार को हस्ताक्षर का उपयोग करके एन्कोड नहीं किया जाता है, इसलिए एक कंपाइलर केवल Encoder के अस्तित्व को सत्यापित कर सकता है।

संबंधित प्रश्न: [typelevel/फ्रेमलेस:

संबंधित मुद्दे