2015-10-22 8 views
11

में कोई भी/शून्य मानों के साथ रिक्त तारों को बदलें, मेरे पास null और उसी कॉलम में खाली तारों के मिश्रण के साथ Spark 1.5.0 DataFrame है। मैं सभी कॉलम में सभी खाली तारों को null (None, पायथन में) रूपांतरित करना चाहता हूं। डेटाफ्रेम में सैकड़ों कॉलम हो सकते हैं, इसलिए मैं प्रत्येक कॉलम के हार्ड-कोडेड मैनिप्लेशंस से बचने की कोशिश कर रहा हूं।डेटाफ्रेम

नीचे अपना प्रयास देखें, जिसके परिणामस्वरूप त्रुटि हुई है।

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

## Create a test DataFrame 
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')]) 
testDF.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## | | 2| 
## |null|null| 
## +----+----+ 

## Try to replace an empty string with None/null 
testDF.replace('', None).show() 
## ValueError: value should be a float, int, long, string, list, or tuple 

## A string value of null (obviously) doesn't work... 
testDF.replace('', 'null').na.drop(subset='col1').show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## +----+----+ 
+0

@palsch, नहीं, यह एक सूची नहीं लौटाता है। यह एक डेटाफ्रेम देता है। मैंने स्पार्क दस्तावेज के लिंक के साथ सवाल अपडेट किया। – dnlbrky

+2

@palsch यह एक सामान्य पायथन सवाल नहीं है! स्पार्क डेटाफ्रेम आमतौर पर बड़े डेटा पर भारी डेटा विश्लेषण की अनुमति देने के लिए उपयोग की जाने वाली डेटा संरचना वितरित की जाती है। तो आप समाधान फिट नहीं हैं। – eliasah

+1

@eliasah सत्य को पाइथोनिक 'लैम्ब्डा एक्स कहा जाना चाहिए: यदि कोई नहीं है तो' udf' के साथ लपेटा गया एक्स और एक्स 'ठीक नहीं होगा :) – zero323

उत्तर

15

यह इस रूप में सरल है:

from pyspark.sql.functions import col, when 

def blank_as_null(x): 
    return when(col(x) != "", col(x)).otherwise(None) 

dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1")) 

dfWithEmptyReplaced.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## |null|null| 
## +----+----+ 

dfWithEmptyReplaced.na.drop().show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## +----+----+ 

एकाधिक कॉलम को भरने के लिए चाहते हैं तो आप उदाहरण के लिए कम कर सकते हैं:

to_convert = set([...]) # Some set of columns 

reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF) 

या समझ का उपयोग करें:

exprs = [ 
    blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns] 

testDF.select(*exprs) 

यदि आप विशेष रूप से ओपे करना चाहते हैं स्ट्रिंग फ़ील्ड पर दर the answerrobin-loxley द्वारा जांचें।

+0

धन्यवाद @ zero323। क्या आपका उत्तर स्वचालित रूप से और कुशलतापूर्वक कई स्तंभों को संभालने के लिए बढ़ाया जा सकता है? शायद सभी कॉलम नामों को सूचीबद्ध करें, प्रत्येक कॉलम के लिए अपने उत्तर के समान कोड उत्पन्न करें, और उसके बाद कोड का मूल्यांकन करें? – dnlbrky

+0

मुझे कोई कारण नहीं दिख रहा है कि आप क्यों नहीं कर सके। डेटाफ्रेम का आलसी मूल्यांकन किया जाता है और शेष केवल एक मानक पायथन है। आपको संपादन में कुछ विकल्प मिलेंगे। – zero323

+0

मैं इस उत्तर को स्वीकार करूंगा, लेकिन क्या आप कृपया @RobinLoxley से थोड़ा सा जोड़ सकते हैं? या, यदि आपको कोई फर्क नहीं पड़ता है तो मैं आपका जवाब संपादित कर सकता हूं। – dnlbrky

8

मेरे समाधान सब समाधान है, जो के रूप में कई क्षेत्रों के साथ सौदा कर सकते हैं के रूप में आप चाहते हैं, निम्नलिखित के रूप में छोटे से समारोह को देखने के I'v अब तक देखा की तुलना में बेहतर है:

// Replace empty Strings with null values 
    private def setEmptyToNull(df: DataFrame): DataFrame = { 
    val exprs = df.schema.map { f => 
     f.dataType match { 
     case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name) 
     case _ => col(f.name) 
     } 
    } 

    df.select(exprs: _*) 
    } 

आप आसानी से कर सकते हैं पुनर्लेखन पाइथन में उपरोक्त कार्य।

मैं सीधे शब्दों में zero323 की और soulmachine के जवाब के शीर्ष पर जोड़ने @liancheng

6

से इस चाल सीखा है। सभी स्ट्रिंगटाइप फ़ील्ड के लिए कनवर्ट करने के लिए।

from pyspark.sql.types import StringType 
string_fields = [] 
for i, f in enumerate(test_df.schema.fields): 
    if isinstance(f.dataType, StringType): 
     string_fields.append(f.name)