2016-08-07 15 views
6

स्पार्क डेटाफ्रेम df को देखते हुए, मैं एक निश्चित संख्यात्मक कॉलम 'values' में अधिकतम मान प्राप्त करना चाहता हूं, और उस मान को प्राप्त करें जहां वह मान प्राप्त हुआ था। मैं निश्चित रूप से यह कर सकते हैं:स्पार्क डेटाफ्रेम में Argmax: अधिकतम मूल्य

# it doesn't matter if I use scala or python, 
# since I hope I get this done with DataFrame API 
import pyspark.sql.functions as F 
max_value = df.select(F.max('values')).collect()[0][0] 
df.filter(df.values == max_value).show() 

लेकिन यह अक्षम है, क्योंकि यह df के माध्यम से दो गुजरता की आवश्यकता है।

pandas.Series/DataFrame और numpy.arrayargmax/idxmax तरीकों कि इस कुशलता से करने (एक पास में) है। तो मानक पायथन (अंतर्निहित फ़ंक्शन max एक प्रमुख पैरामीटर स्वीकार करता है, इसलिए इसका उपयोग उच्चतम मान की अनुक्रमणिका को खोजने के लिए किया जा सकता है)।

स्पार्क में सही दृष्टिकोण क्या है? ध्यान दें कि मुझे कोई फर्क नहीं पड़ता कि मुझे उन सभी पंक्तियां मिलती हैं जहां अधिकतम मूल्य प्राप्त होता है, या उन पंक्तियों के कुछ मनमाने ढंग से (खाली नहीं!) सबसेट।

+0

सामान्य में कोई बेहतर समाधान है जो पार भाषा है और मनमाने ढंग से डेटा पर काम कर सकते हैं। – zero323

+0

@ zero323 डेटाफ्रेम एपीआई में नीचे दिए गए उत्तर में आरडीडी कोड को रैला करना असंभव क्यों है, इसे स्कैला में परिवर्तित करके और उत्प्रेरक के लिए उचित मेटाडेटा जोड़ने के लिए? – max

+0

यह संभव है लेकिन यह स्पष्ट रूप से धारणा को तोड़ देता है कि यदि आप स्कैला या पायथन_ का उपयोग करते हैं तो इससे कोई फर्क नहीं पड़ता कि आप इसे अकेले एसक्यूएल के साथ ऑर्डर करने योग्य डेटा प्रकारों के साथ भी कर सकते हैं लेकिन यह एक विशेष मामला सामान्य समाधान नहीं है। – zero323

उत्तर

10

स्कीमा is Orderable (स्कीमा केवल एटोमिक्स/एटोमिक्स के एरे/रिकर्सिवली orderable structs होता है) यदि आप सरल एकत्रित उपयोग कर सकते हैं:

अजगर:

df.select(F.max(
    F.struct("values", *(x for x in df.columns if x != "values")) 
)).first() 

स्काला:

df.select(max(struct(
    $"values" +: df.columns.collect {case x if x!= "values" => col(x)}: _* 
))).first 

नहीं तो आप कम कर सकते हैं Dataset से अधिक (स्काला केवल), लेकिन यह अतिरिक्त अक्रमांकन की आवश्यकता है:

type T = ??? 

df.reduce((a, b) => if (a.getAs[T]("values") > b.getAs[T]("values")) a else b) 
+0

थोड़ा मुश्किल, मुझे इस 'स्ट्रक्चर' विधि –

+0

के बारे में पढ़ने की ज़रूरत है क्या आपको 'ऑर्डर करने योग्य' स्कीमा की स्पष्टीकरण/परिभाषा को जोड़ने का मन है? Google खोज को केवल यह जवाब मिला :) – max

+0

https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering। स्केला # L89-L96 – zero323

2

शायद यह एक अधूरा जवाब है, लेकिन आप का उपयोग करें 'DataFrame आंतरिक RDD कर सकते हैं, max विधि लागू करते हैं और एक निर्धारित कुंजी का उपयोग कर अधिक से अधिक रिकॉर्ड मिलता है।

a = sc.parallelize([ 
    ("a", 1, 100), 
    ("b", 2, 120), 
    ("c", 10, 1000), 
    ("d", 14, 1000) 
    ]).toDF(["name", "id", "salary"]) 

a.rdd.max(key=lambda x: x["salary"]) # Row(name=u'c', id=10, salary=1000) 
+1

क्या मैं आरडीडी एपीआई (पाइथन ओवरहेड से बचने के लिए स्कैला) के साथ 1 पास मान सकता हूं डेटाफ्रेम एपीआई के साथ 2 पास से तेज की गारंटी है? या कुछ अनुकूलन उत्प्रेरक यहां कर सकता है? – max

संबंधित मुद्दे