2016-04-01 16 views
10

उदाकैसे pyspark

sqlContext = SQLContext(sc) 

sample=sqlContext.sql("select Name ,age ,city from user") 
sample.show() 

में dataFrame की प्रत्येक पंक्ति के माध्यम से लूप करने के लिए ऊपर बयान टर्मिनल पर संपूर्ण तालिका मुद्रित, लेकिन मैं के लिए का उपयोग कर कि तालिका में प्रत्येक पंक्ति तक पहुँचने के लिए या जब आगे की गणना प्रदर्शन करने के लिए चाहते हैं।

+0

मुझे विश्वास है कि मैंने एक सही उत्तर प्रदान किया है। क्या आप चुन सकते हैं, या सुधार करने के लिए प्रतिक्रिया प्रदान कर सकते हैं? – aaronsteers

उत्तर

13

आप बस नहीं कर सकते हैं। DataFrames, अन्य वितरित डेटा संरचनाओं के समान, iterable नहीं हैं और केवल समर्पित उच्च ऑर्डर फ़ंक्शन और/या SQL विधियों का उपयोग करके एक्सेस किया जा सकता है।

आप निश्चित रूप collect की toLocalIterator परिवर्तित कर सकते हैं या और स्थानीय स्तर पर पुनरावृति

for row in df.rdd.collect(): 
    do_something(row) 

लेकिन यह चिंगारी का उपयोग कर के सभी उद्देश्य धड़कता है।

2

यदि आप डेटाफ्रेम ऑब्जेक्ट में प्रत्येक पंक्ति में कुछ करना चाहते हैं, तो map का उपयोग करें। यह आपको प्रत्येक पंक्ति पर और गणना करने की अनुमति देगा। यह 0 से len(dataset)-1 तक पूरे डेटासेट में लूपिंग के बराबर है।

ध्यान दें कि यह एक PipelinedRDD वापस करेगा, डेटाफ्रेम नहीं।

21

आप एक कस्टम फ़ंक्शन को परिभाषित करेंगे और मानचित्र का उपयोग करेंगे।

def customFunction(row): 

    return (row.name, row.age, row.city) 

sample2 = sample.rdd.map(customFunction) 

या

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city)) 

कस्टम समारोह तो dataframe के हर पंक्ति पर लागू किया जाएगा। ध्यान दें कि नमूना 2 एक RDD होगा, डेटाफ्रेम नहीं।

यदि आप अधिक जटिल गणना करने जा रहे हैं तो मानचित्र की आवश्यकता है। यदि आपको केवल व्युत्पन्न कॉलम जोड़ने की आवश्यकता है, तो आप डेटाफ्रेम लौटने के साथ withColumn का उपयोग कर सकते हैं।

sample3 = sample.withColumn('age2', sample.age + 2) 
2

अजगर में सूची comprehensions का उपयोग करना, तुम सिर्फ दो पंक्तियों का उपयोग करके सूची में मूल्यों की एक पूरी स्तंभ एकत्र कर सकते हैं:

df = sqlContext.sql("show tables in default") 
tableList = [x["tableName"] for x in df.rdd.collect()] 

उपरोक्त उदाहरण में, हम डेटाबेस में तालिकाओं की एक सूची प्रदान ' डिफ़ॉल्ट ', लेकिन एसक्यूएल() में उपयोग की गई क्वेरी को प्रतिस्थापित करके इसे अनुकूलित किया जा सकता है।

या अधिक संक्षिप्त:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()] 

और तीन कॉलम के अपने उदाहरण के लिए, हम शब्दकोशों की एक सूची बना सकते हैं और फिर पाश के लिए एक में उन के माध्यम से पुनरावृति।

sql_text = "select name, age, city from user" 
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
      for x in sqlContext.sql(sql_text).rdd.collect()] 
for row in tupleList: 
    print("{} is a {} year old from {}".format(
     row["name"], 
     row["age"], 
     row["city"])) 
0

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

ऊपर

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 
name के लिए

, age होना चाहिए, और city चर लेकिन बस शब्दकोश की चाबी नहीं हैं।