19

मैं स्पार्क डेटाफ़्रेम कॉलम में सबसे बड़ा मूल्य प्राप्त करने का सबसे अच्छा तरीका जानने का प्रयास कर रहा हूं।स्पार्क डेटाफ्रेम कॉलम में अधिकतम मान प्राप्त करने का सबसे अच्छा तरीका

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
df.show() 

कौन बनाता है::

+---+---+ 
| A| B| 
+---+---+ 
|1.0|4.0| 
|2.0|5.0| 
|3.0|6.0| 
+---+---+ 

मेरा लक्ष्य स्तंभ एक में सबसे बड़ा मान को मिल रहा है (निरीक्षण से, इस 3.0 है)

निम्न उदाहरण पर विचार करें।

# Method 1: Use describe() 
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 

# Method 2: Use SQL 
df.registerTempTable("df_table") 
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 

# Method 3: Use groupby() 
df.groupby().max('A').collect()[0].asDict()['max(A)'] 

# Method 4: Convert to RDD 
df.select("A").rdd.max()[0] 

ऊपर से प्रत्येक सही जवाब देता है, लेकिन एक स्पार्क रूपरेखा उपकरण के अभाव में मैं नहीं बता सकता है जो सबसे अच्छा है: PySpark की सहायता से देखें चार दृष्टिकोण मैं के बारे में सोच सकते हैं।

स्पार्क रनटाइम या संसाधन उपयोग के संदर्भ में उपर्युक्त तरीकों पर अंतर्ज्ञान या अनुभववाद से कोई भी विचार सबसे ऊपर है या क्या ऊपर दिए गए लोगों की तुलना में अधिक प्रत्यक्ष विधि है या नहीं?

+5

तरीके 2 और 3 बराबर हैं और समान भौतिक और अनुकूलित लॉजिकल योजनाओं का उपयोग करते हैं। विधि 4 Rdd पर अधिकतम के साथ कम लागू होता है। यह डेटाफ्रेम पर सीधे संचालन करने से धीमा हो सकता है। विधि 1 2 और 3 के बराबर कम है। – zero323

+1

@ zero323 'df.select (अधिकतम (" ए ")) के बारे में क्या है।() [0] .asDict() ['max (ए)']'? विधि 2 के बराबर दिखता है जबकि अधिक कॉम्पैक्ट, और विधि 3 के बारे में भी अधिक सहज ज्ञान युक्त लगता है। – desertnaut

+0

- सबसे धीमा तरीका विधि 4 है, क्योंकि आप पूरे कॉलम के आरडीडी रूपांतरण में डीएफ करते हैं और फिर अधिकतम मूल्य निकालते हैं; –

उत्तर

15
>df1.show() 
+-----+--------------------+--------+----------+-----------+ 
|floor|   timestamp|  uid|   x|   y| 
+-----+--------------------+--------+----------+-----------+ 
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418| 
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393| 
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585| 
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073| 

>row1 = df1.agg({"x": "max"}).collect()[0] 
>print row1 
Row(max(x)=110.33613) 
>print row1["max(x)"] 
110.33613 

उत्तर लगभग विधि 3 जैसा ही है। लेकिन "asDict()" method3 में

+0

कोई बता सकता है कि क्यों एकत्र() [0] की आवश्यकता है? – jibiel

+2

@jibiel 'collect() 'एक सूची देता है (इस मामले में एक ही आइटम के साथ), तो आपको सूची में पहले (केवल) आइटम तक पहुंचने की आवश्यकता है –

2

हटाया जा सकता है लगता है मामले में कुछ चमत्कार है कि यह कैसे स्काला का उपयोग कर ऐसा करने के लिए, ये रहा (स्पार्क 2.0 का उपयोग कर +।): एक के लिए

scala> df.createOrReplaceTempView("TEMP_DF") 
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF"). 
    collect()(0).getInt(0) 
scala> print(myMax) 
117 
6

अधिकतम मूल्य एक dataframe की विशेष कॉलम का उपयोग करके प्राप्त किया जा सकता है -

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

0

टिप्पणी: स्पार्क बिग डाटा पर काम करने के लिए है - कंप्यूटिंग वितरित किए। उदाहरण डेटाफ्रेम का आकार बहुत छोटा है, इसलिए वास्तविक जीवन उदाहरणों के क्रम को छोटे ~ उदाहरण के संबंध में बदला जा सकता है।

सबसे धीमा: Method_1, क्योंकि .describe ('ए') मिनट की गणना करता है, अधिकतम, मतलब है, stddev, और गिनती (पूरे स्तंभ पर 5 गणना)

मध्यम: Method_4, क्योंकि, .rdd (डीएफ के लिए आरडीडी परिवर्तन) प्रक्रिया को धीमा कर देता है।

तेज़: विधि_3 ~ विधि_2 ~ method_5, क्योंकि तर्क बहुत समान है, इसलिए स्पार्क का उत्प्रेरक अनुकूलक कम से कम संचालन के साथ बहुत समान तर्क का पालन करता है (किसी विशेष कॉलम का अधिकतम प्राप्त करें, एक-मूल्य डेटाफ्रेम एकत्र करें);

छोटे DF (ms): मिलीसेकेंड में एक क्लस्टर के किनारे से नोड पर

import pandas as pd 
import time 

time_dict = {} 

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
#-- For bigger/realistic dataframe just uncomment the following 3 lines 
#lst = list(np.random.normal(0.0, 100.0, 100000)) 
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst}) 
#dfff = self.sqlContext.createDataFrame(pdf) 

tic1 = int(round(time.time() * 1000)) 
# Method 1: Use describe() 
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 
tac1 = int(round(time.time() * 1000)) 
time_dict['m1']= tac1 - tic1 
print (max_val) 

tic2 = int(round(time.time() * 1000)) 
# Method 2: Use SQL 
dfff.registerTempTable("df_table") 
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 
tac2 = int(round(time.time() * 1000)) 
time_dict['m2']= tac2 - tic2 
print (max_val) 

tic3 = int(round(time.time() * 1000)) 
# Method 3: Use groupby() 
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)'] 
tac3 = int(round(time.time() * 1000)) 
time_dict['m3']= tac3 - tic3 
print (max_val) 

tic4 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.select("A").rdd.max()[0] 
tac4 = int(round(time.time() * 1000)) 
time_dict['m4']= tac4 - tic4 
print (max_val) 

tic5 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.agg({"A": "max"}).collect()[0][0] 
tac5 = int(round(time.time() * 1000)) 
time_dict['m5']= tac5 - tic5 
print (max_val) 

print time_dict 

परिणाम (एमएस) (.asDict() एक छोटे से अतिरिक्त समय 3,2 5 की तुलना में कहते हैं): {'एम 1': 70 9 6, 'एम 2': 205, 'एम 3': 165, 'एम 4': 211, 'एम 5': 180}

बड़ा डीएफ (एमएस): {'एम 1': 10260, 'एम 2 ': 452,' एम 3 ': 465,' एम 4 ': 916,' एम 5 ': 373}

संबंधित मुद्दे