2017-01-05 7 views
7

ए (पायथन) उदाहरण से मेरा अनुकरण साफ़ कर देगा।पिछली पंक्तियों (पीईएसपार्क डेटाफ्रेम)

movierecord = spark.createDataFrame([("Alice", 1, ["Avatar"]),("Bob", 2, ["Fargo", "Tron"]),("Alice", 4, ["Babe"]), ("Alice", 6, ["Avatar", "Airplane"]), ("Alice", 7, ["Pulp Fiction"]), ("Bob", 9, ["Star Wars"])],["name","unixdate","movies"]) 

स्कीमा और dataframe ऊपर देखो द्वारा परिभाषित किया गया इस प्रकार है::

root 
|-- name: string (nullable = true) 
|-- unixdate: long (nullable = true) 
|-- movies: array (nullable = true) 
| |-- element: string (containsNull = true) 

+-----+--------+------------------+ 
|name |unixdate|movies   | 
+-----+--------+------------------+ 
|Alice|1  |[Avatar]   | 
|Bob |2  |[Fargo, Tron]  | 
|Alice|4  |[Babe]   | 
|Alice|6  |[Avatar, Airplane]| 
|Alice|7  |[Pulp Fiction] | 
|Bob |9  |[Star Wars]  | 
+-----+--------+------------------+ 

मैं चाहता हूँ चलो कहते हैं कि इस प्रकार मैं, लोग हैं, जो कुछ तारीखों पर कुछ फ़िल्मों में देखा की एक चिंगारी dataframe करते हैं उपरोक्त से एक नया डेटाफ्रेम कॉलम उत्पन्न करने के लिए सभी पिछली प्रत्येक उपयोगकर्ता द्वारा देखी गई फिल्में, डुप्लिकेट के बिना ("पिछली" यूनिक्सडेट फ़ील्ड के बिना)। तो इसे इस तरह दिखना चाहिए:

+-----+--------+------------------+------------------------+ 
|name |unixdate|movies   |previous_movies   | 
+-----+--------+------------------+------------------------+ 
|Alice|1  |[Avatar]   |[]      | 
|Bob |2  |[Fargo, Tron]  |[]      | 
|Alice|4  |[Babe]   |[Avatar]    | 
|Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
|Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
|Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
+-----+--------+------------------+------------------------+ 

मैं इसे एक अच्छे तरीके से कैसे कार्यान्वित कर सकता हूं?

उत्तर

5

एसक्यूएल केवलआदेश वस्तुओं की संरक्षण के बिना:

  • आवश्यक आयात:

    import pyspark.sql.functions as f 
    from pyspark.sql.window import Window 
    
  • विंडो परिभाषा:

    w = Window.partitionBy("name").orderBy("unixdate") 
    
  • पूर्ण समाधान:

    (movierecord 
        # Flatten movies 
        .withColumn("previous_movie", f.explode("movies")) 
        # Collect unique 
        .withColumn("previous_movies", f.collect_set("previous_movie").over(w)) 
        # Drop duplicates for a single unixdate 
        .groupBy("name", "unixdate") 
        .agg(f.max(f.struct(
         f.size("previous_movies"), 
         f.col("movies").alias("movies"), 
         f.col("previous_movies").alias("previous_movies") 
        )).alias("tmp")) 
        # Shift by one and extract 
        .select(
         "name", "unixdate", "tmp.movies", 
         f.lag("tmp.previous_movies", 1).over(w).alias("previous_movies"))) 
    
  • परिणाम:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    |Alice|1  |[Avatar]   |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Babe, Avatar]   | 
    |Alice|7  |[Pulp Fiction] |[Babe, Airplane, Avatar]| 
    +-----+--------+------------------+------------------------+ 
    

एसक्यूएल एक अजगर यूडीएफ आदेश संरक्षण:

  • आयात:

    import pyspark.sql.functions as f 
    from pyspark.sql.window import Window 
    from pyspark.sql import Column 
    from pyspark.sql.types import ArrayType, StringType 
    
    from typing import List, Union 
    
    # https://github.com/pytoolz/toolz 
    from toolz import unique, concat, compose 
    
  • यूडीएफ: के रूप में पहले

    def flatten_distinct(col: Union[Column, str]) -> Column: 
        def flatten_distinct_(xss: Union[List[List[str]], None]) -> List[str]: 
         return compose(list, unique, concat)(xss or []) 
        return f.udf(flatten_distinct_, ArrayType(StringType()))(col) 
    
  • विंडो परिभाषा।

  • पूरा समाधान:

    (movierecord 
        # Collect lists 
        .withColumn("previous_movies", f.collect_list("movies").over(w)) 
        # Flatten and drop duplicates 
        .withColumn("previous_movies", flatten_distinct("previous_movies")) 
        # Shift by one 
        .withColumn("previous_movies", f.lag("previous_movies", 1).over(w)) 
        # For presentation only 
        .orderBy("unixdate")) 
    
  • परिणाम:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Alice|1  |[Avatar]   |null     | 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
    |Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    +-----+--------+------------------+------------------------+ 
    

प्रदर्शन:

मेरा मानना ​​है कि इस को हल करने के कोई कारगर तरीका की कमी को देखते हुए ।न केवल अनुरोधित आउटपुट के लिए एक महत्वपूर्ण डेटा डुप्लिकेशंस की आवश्यकता होती है (डेटा टंगस्टन प्रारूप में फिट करने के लिए बाइनरी एन्कोड किया जाता है, इसलिए आपको संभावित संपीड़न मिलती है लेकिन ऑब्जेक्ट पहचान ढीली होती है) लेकिन महंगे समूहिंग और सॉर्टिंग सहित स्पार्क कंप्यूटिंग मॉडल को महंगे कई ऑपरेशन भी मिलते हैं।

यह ठीक होना चाहिए यदि previous_movies का आकार सीमाबद्ध और छोटा है लेकिन सामान्य रूप से व्यवहार्य नहीं होगा।

डेटा डुप्लिकेशंस किसी उपयोगकर्ता के लिए एकल, आलसी इतिहास रखकर संबोधित करना बहुत आसान है। एसक्यूएल में कुछ भी नहीं किया जा सकता है लेकिन निम्न स्तर के आरडीडी संचालन के साथ काफी आसान है।

विस्फोट और collect_ पैटर्न महंगा है। यदि आपकी आवश्यकताएं सख्त हैं लेकिन आप प्रदर्शन में सुधार करना चाहते हैं तो आप पाइथन के स्थान पर स्कैला यूडीएफ का उपयोग कर सकते हैं।

संबंधित मुद्दे