2016-10-18 15 views
5

मैं इस तरह एक dataframe है:array- (Dataframe) की सरणी विस्फोट pySpark

+-----+--------------------+ 
|index|    merged| 
+-----+--------------------+ 
| 0|[[2.5, 2.4], [3.5...| 
| 1|[[-1.0, -1.0], [-...| 
| 2|[[-1.0, -1.0], [-...| 
| 3|[[0.0, 0.0], [0.5...| 
| 4|[[0.5, 0.5], [1.0...| 
| 5|[[0.5, 0.5], [1.0...| 
| 6|[[-1.0, -1.0], [0...| 
| 7|[[0.0, 0.0], [0.5...| 
| 8|[[0.5, 0.5], [1.0...| 
+-----+--------------------+ 

और मैं

+-----+-------+-------+ 
|index|Column1|Column2| 
+-----+-------+-------+ 
| 0| 2.5| 2.4 | 
| 1| 3.5| 0.5| 
| 2| -1.0| -1.0| 
| 3| -1.0| -1.0| 
| 4| 0.0 | 0.0 | 
| 5| 0.5| 0.74| 
+-----+-------+-------+ 

प्रत्येक टपल में मर्ज किए गए स्तंभ में विस्फोट करना चाहते हैं [[2.5, 2.4] , [3.5,0,5]] दो स्तंभों को दोबारा दोहराएं, जानते हुए कि 2,5 और 3,5 कॉलम 1 में संग्रहीत किए जाएंगे और (2.4,0,5) दूसरे कॉलम

में संग्रहीत किए जाएंगे, इसलिए मैंने यह कोशिश की

df= df.withColumn("merged", df["merged"].cast("array<array<float>>")) 
df= df.withColumn("merged",explode('merged')) 

तो मैं एक और DF

बनाने के लिए एक यूडीएफ लागू होगी, लेकिन मैं डेटा डाली नहीं कर सकते हैं या लागू विस्फोट, और मैं त्रुटि

pyspark.sql.utils.AnalysisException: u"cannot resolve 'cast(merged as array<array<float>)' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true) 

मैं भी कोशिश की

df= df.withColumn("merged", df["merged"].cast("array<string>")) 
प्राप्त

लेकिन कुछ भी काम नहीं करता है और यदि मैं बिना कलाकार के विस्फोट लागू करता हूं, तो मुझे

प्राप्त होता है
pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(merged)' due to data type mismatch: input to function explode should be array or map type, not StringType; 
+0

आप df का स्कीमा दे सकते हैं? ऐसा लगता है कि विलय वास्तव में एक स्ट्रिंग है, न कि आपके तर्क में क्या है। आप एक विभाजक द्वारा स्ट्रिंग को विभाजित करने के लिए 'विभाजन' का उपयोग कर सकते हैं। साथ ही, ऐसा लगता है कि आपके प्रश्न में टाइपो हैं: अपेक्षित परिणाम के उदाहरण में विस्फोटित मूल्यों के लिए इंडेक्स समान नहीं है? या क्या आपने जो वास्तव में दिया है वह दिया है? – Wilmerton

+0

Thx, मैंने अपना कोड पढ़ा है, और मैंने पाया कि मैं अपने लैम्ब्डा फ़ंक्शन (जो मेरे कॉलम मर्ज करते हैं) में वापसी प्रकार ArrayType (ArrayType (FloatType())) को जोड़ना भूल गया है, तो – MrGildarts

+0

... समस्या हल हो गई है? – Wilmerton

उत्तर

0

आप नीचे दिए गए कोड की कोशिश कर सकते:

from pyspark import SparkConf, SparkContext       
from pyspark.sql import SparkSession        

from pyspark.sql.types import FloatType, StringType, IntegerType 
from pyspark.sql.functions import udf, col       


def col1_calc(merged):            
    return merged[0][0]            

def col2_calc(merged):            
    return merged[0][1]            

if __name__ == '__main__':           
    spark = SparkSession \           
     .builder \             
     .appName("Python Spark SQL Hive integration example") \  
     .getOrCreate()            

    df = spark.createDataFrame([         
     (0, [[2.5,2.4],[3.5]]),          
     (1, [[-1.0,-1.0],[3.5]]),         
     (2, [[-1.0,-1.0],[3.5]]),         
    ], ["index", "merged"])           

    df.show()              

    column1_calc = udf(col1_calc, FloatType())      
    df = df.withColumn('Column1', column1_calc(df['merged']))  
    column2_calc = udf(col2_calc, FloatType())      
    df = df.withColumn('Column2', column2_calc(df['merged']))  

    df = df.select(['Column1', 'Column2', 'index'])     
    df.show()   

आउटपुट:

+-------+-------+-----+ 
|Column1|Column2|index| 
+-------+-------+-----+ 
| 2.5| 2.4| 0| 
| -1.0| -1.0| 1| 
| -1.0| -1.0| 2| 
+-------+-------+-----+ 
संबंधित मुद्दे