2016-12-07 10 views
11

मेरे पास एक डेटाफ्रेम है जिसमें एक पंक्ति है, और कई कॉलम हैं। कुछ कॉलम एकल मान हैं, और अन्य सूचियां हैं। सभी सूची कॉलम एक ही लंबाई हैं। मैं प्रत्येक सूची कॉलम को एक अलग पंक्ति में विभाजित करना चाहता हूं, जबकि किसी भी गैर-सूची कॉलम को रखना है।Pyspark: कई एरे कॉलम पंक्तियों में विभाजित करें

नमूना DF:

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')]) 
# +---+---------+---------+---+ 
# | a|  b|  c| d| 
# +---+---------+---------+---+ 
# | 1|[1, 2, 3]|[7, 8, 9]|foo| 
# +---+---------+---------+---+ 

मुझे क्या करना चाहते हैं:

+---+---+----+------+ 
| a| b| c | d | 
+---+---+----+------+ 
| 1| 1| 7 | foo | 
| 1| 2| 8 | foo | 
| 1| 3| 9 | foo | 
+---+---+----+------+ 

अगर मैं केवल एक सूची स्तंभ, यह सिर्फ एक explode करके आसान होगा था:

df_exploded = df.withColumn('b', explode('b')) 
# >>> df_exploded.show() 
# +---+---+---------+---+ 
# | a| b|  c| d| 
# +---+---+---------+---+ 
# | 1| 1|[7, 8, 9]|foo| 
# | 1| 2|[7, 8, 9]|foo| 
# | 1| 3|[7, 8, 9]|foo| 
# +---+---+---------+---+ 

हालांकि, अगर मैं explodec कॉलम भी करने का प्रयास करता हूं, तो मैं डेटाफ के साथ समाप्त होता हूं लंबाई के साथ टहनी जो मैं चाहता के वर्ग:

df_exploded_again = df_exploded.withColumn('c', explode('c')) 
# >>> df_exploded_again.show() 
# +---+---+---+---+ 
# | a| b| c| d| 
# +---+---+---+---+ 
# | 1| 1| 7|foo| 
# | 1| 1| 8|foo| 
# | 1| 1| 9|foo| 
# | 1| 2| 7|foo| 
# | 1| 2| 8|foo| 
# | 1| 2| 9|foo| 
# | 1| 3| 7|foo| 
# | 1| 3| 8|foo| 
# | 1| 3| 9|foo| 
# +---+---+---+---+ 

मैं क्या चाहते है - प्रत्येक स्तंभ के लिए, उस कॉलम में सरणी के n वें तत्व लेने के लिए और एक नई पंक्ति है कि जोड़ें। मैं एक dataframe में सभी स्तंभों करवाते विस्फोट मानचित्रण की कोशिश की है, लेकिन यह काम करने के लिए या तो प्रतीत नहीं होता:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF() 

उत्तर

13
DataFrames साथ

और यूडीएफ:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType 
from pyspark.sql.functions import col, udf 

zip_ = udf(
    lambda x, y: list(zip(x, y)), 
    ArrayType(StructType([ 
     # Adjust types to reflect data types 
     StructField("first", IntegerType()), 
     StructField("second", IntegerType()) 
    ])) 
) 

(df 
    .withColumn("tmp", zip_("b", "c")) 
    # UDF output cannot be directly passed to explode 
    .withColumn("tmp", explode("tmp")) 
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d")) 

RDDs के साथ:

(df 
    .rdd 
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) 
    .toDF(["a", "b", "c", "d"])) 

पाइथन संचार ओवरहेड के कारण दोनों समाधान अक्षम हैं। डेटा का आकार तय हो गई है, तो आप कुछ इस तरह कर सकते हैं:

from functools import reduce 
from pyspark.sql import DataFrame 

# Length of array 
n = 3 

# For legacy Python you'll need a separate function 
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") 
     for i in range(n)) 
).toDF("a", "b", "c", "d") 

या यहाँ तक कि:

from pyspark.sql.functions import array, struct 

# SQL level zip of arrays of known size 
# followed by explode 
tmp = explode(array(*[ 
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) 
    for i in range(n) 
])) 

(df 
    .withColumn("tmp", tmp) 
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d")) 

यह काफी तेजी यूडीएफ या RDD की तुलना में किया जाना चाहिए।

# This uses keyword only arguments 
# If you use legacy Python you'll have to change signature 
# Body of the function can stay the same 
def zip_and_explode(*colnames, n): 
    return explode(array(*[ 
     struct(*[col(c).getItem(i).alias(c) for c in colnames]) 
     for i in range(n) 
    ])) 

df.withColumn("tmp", zip_and_explode("b", "c", n=3)) 
4

flatMap, नहीं map उपयोग करने के लिए के रूप में आप प्रत्येक इनपुट पंक्ति से बाहर कई उत्पादन पंक्तियों बनाना चाहते आप आवश्यकता होगी: स्तंभों की एक मनमाना संख्या का समर्थन करने के सामान्यीकृत।

from pyspark.sql import Row 
def dualExplode(r): 
    rowDict = r.asDict() 
    bList = rowDict.pop('b') 
    cList = rowDict.pop('c') 
    for b,c in zip(bList, cList): 
     newDict = dict(rowDict) 
     newDict['b'] = b 
     newDict['c'] = c 
     yield Row(**newDict) 

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode)) 
संबंधित मुद्दे