2015-08-12 7 views
14

में नए कॉलम के रूप में कॉलम योग जोड़ें, मैं पीईएसपार्क का उपयोग कर रहा हूं और मेरे पास संख्यात्मक कॉलम के समूह के साथ स्पार्क डेटाफ्रेम है। मैं एक कॉलम जोड़ना चाहता हूं जो कि अन्य सभी स्तंभों का योग है।पीईएसपार्क डेटाफ्रेम

मान लीजिए कि मेरे डेटाफ्रेम में कॉलम "ए", "बी" और "सी" था। मैं जानता हूँ कि मैं यह कर सकता:

df.withColumn('total_col', df.a + df.b + df.c) 

समस्या यह है कि मैं व्यक्तिगत रूप से प्रत्येक कॉलम लिखते हैं और उन्हें जोड़ने के लिए नहीं करना चाहते हैं, खासकर अगर मैं कॉलम की एक बहुत कुछ है। मैं इसे स्वचालित रूप से या कॉलम नामों की एक सूची निर्दिष्ट करके सक्षम करना चाहता हूं जिसे मैं जोड़ना चाहता हूं। क्या इसे करने का और कोई तरीका है?

+0

यह डेटाफ्रेम की तुलना में आरडीडी के साथ बहुत आसान है। यदि डेटा एक पंक्ति का प्रतिनिधित्व करने वाला एक सरणी है, तो आप 'RDD.map (लैम्ब्डा डेटा: (डेटा, योग (डेटा)) कर सकते हैं) '। स्पार्क डेटाफ्रेम के साथ यह अधिक कठिन कारण है कि 'कॉलम' में कॉलम अभिव्यक्ति के रूप में क्या अनुमति है। ऐसा लगता है कि यह बहुत अच्छी तरह से प्रलेखित नहीं है। – Paul

+0

यह या तो काम नहीं करता है (PySpark 1.6.3): 'dftest.with कॉलम (" टाइम्स ", योग ((dftest [c]> 2)। Ccastest.columns में c के लिए .cast (" int ")। [1:])) 'और फिर, 'dftest.select (' a ',' b ',' c ',' d ')। Rdd.map (lambda x: (x, sum (x))) ।ले लें (2) ' काम नहीं लग रहा है –

उत्तर

24

यह स्पष्ट नहीं था। मुझे स्पार्क डेटाफ्रेम एपीआई में परिभाषित कॉलम की कोई पंक्ति-आधारित राशि दिखाई नहीं देती है।

संस्करण 2

यह एक काफी सरल तरीके से किया जा सकता है:

newdf = df.withColumn('total', sum(df[col] for col in df.columns)) 

df.columns स्पार्क Dataframe में स्तंभ नाम के सभी दे रही है स्ट्रिंग की एक सूची के रूप में pyspark द्वारा आपूर्ति की है। एक अलग राशि के लिए, आप इसके बजाय कॉलम नामों की किसी भी अन्य सूची की आपूर्ति कर सकते हैं।

मैंने इसे अपने पहले समाधान के रूप में नहीं देखा क्योंकि मुझे यकीन नहीं था कि यह कैसे व्यवहार करेगा। लेकिन यह काम करता है।

संस्करण 1

यह बेहद जटिल है, लेकिन साथ ही काम करता है।

आप ऐसा कर सकते हैं:

  1. उपयोग df.columns
  2. उपयोग कि नाम सूची है कि कुछ करने के लिए सूची
  3. पास कॉलम की एक सूची बनाने के लिए स्तंभों की नामों की सूची प्राप्त करने के लिए कि एक fold-type functional manner

में स्तंभ के अतिभारित ऐड समारोह लागू करेगा अजगर के reduce के साथ, कैसे ऑपरेटर ओवरलोडिंग कार्यों में से कुछ ज्ञान, और कॉलम here के लिए pyspark कोड है कि हो जाता है:

def column_add(a,b): 
    return a.__add__(b) 

newdf = df.withColumn('total_col', 
     reduce(column_add, (df[col] for col in df.columns))) 

नोट यह एक अजगर को कम करने, नहीं एक चिंगारी RDD को कम है, और क्योंकि यह एक सूची जनरेटर अभिव्यक्ति है दूसरा कम करने के लिए पैरामीटर में कोष्ठक अवधि कोष्ठक की आवश्यकता है।

परीक्षण, काम करता है!

$ pyspark 
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache() 
>>> df 
DataFrame[a: bigint, b: bigint, c: bigint] 
>>> df.columns 
['a', 'b', 'c'] 
>>> def column_add(a,b): 
...  return a.__add__(b) 
... 
>>> df.withColumn('total', reduce(column_add, (df[col] for col in df.columns))).collect() 
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)] 
+0

@ सैलमोनरड धन्यवाद। यह कभी-कभी स्पार्क डेटाफ्रेम क्लास को याद रखने में मदद करता है, और इसलिए डेटा में कोई भी बदलाव करने के लिए आपको कुछ नया कॉलफ्रेम देता है जो एक नया डेटाफ्रेम देता है। – Paul

+3

संस्करण 2 स्पार्क 1.5.0 और सीडीएच-5.5.2 के साथ काम नहीं कर रहा है। और पायथन संस्करण 3.4। यह एक त्रुटि फेंक रहा है: "विशेषता त्रुटि: 'जेनरेटर' ऑब्जेक्ट में कोई विशेषता नहीं है '_get_object_id" – Hemant

+0

आपके दोनों समाधान अच्छे और साफ हैं। मैं सोच रहा हूं कि आपने इसके लिए उपयोगकर्ता परिभाषित कार्यों का उपयोग क्यों नहीं किया? – ThePrincess