2015-02-09 10 views
5

के बाद स्पार्क के साथ हाइव टेबल से पढ़ने और लिखना हमारे पास एक हाइव वेयरहाउस है, और विभिन्न कार्यों (मुख्य रूप से वर्गीकरण) के लिए स्पार्क का उपयोग करना चाहता था। कभी-कभी परिणामों को एक हाइव टेबल के रूप में लिखें। उदाहरण के लिए, हमने original_table कॉलम दो द्वारा समूहीकृत मूल_टेबल कॉलम दो की कुल योग को खोजने के लिए निम्न पायथन फ़ंक्शन लिखा है। फ़ंक्शन काम करता है, लेकिन हम चिंतित हैं कि यह अक्षम है, विशेष रूप से मानचित्र कुंजी-मूल्य जोड़े, और शब्दकोश संस्करणों में कनवर्ट करने के लिए। कार्य combiner, mergeValue, mergeCombiner कहीं और परिभाषित किया जाता है, लेकिन ठीक काम करते हैं।एग्रीगेशन

from pyspark import HiveContext 

rdd = HiveContext(sc).sql('from original_table select *') 

#convert to key-value pairs 
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1]))) 

#create rdd where rows are (key, (sum, count) 
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner) 

# creates rdd with dictionary values in order to create schemardd 
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]}) 

# infer the schema 
schema_rdd = HiveContext(sc).inferSchema(dict_rdd) 

# save 
schema_rdd.saveAsTable('new_table_name') 

क्या वही काम करने के और अधिक प्रभावी तरीके हैं?

+1

यह सुनिश्चित नहीं है कि आपको एक rdd में क्यों परिवर्तित करना है, लेकिन यदि आप जोर देते हैं कि आप 'comb_BueKey' के बजाय 'key_value_rdd.reduceByKey (lambda x, y: sum (x, y)) कर सकते हैं। – mtoto

उत्तर

0

... शायद यह प्रश्न संभव नहीं था जब प्रश्न लिखा गया था, लेकिन क्या यह createDataFrame() कॉल का उपयोग करने के लिए अब (1.3 पोस्ट) समझ में नहीं आता है?

अपना पहला आरडीडी प्राप्त करने के बाद, ऐसा लगता है कि आप कॉल कर सकते हैं, फिर एक पास में पूरी नौकरी पाने के लिए संरचना के खिलाफ एक सरल एसक्यूएल कथन चलाएं। (योग और समूह) प्लस, डेटाफ्रेम संरचना सीधे सृजन पर स्कीमा का अनुमान लगा सकती है अगर मैं एपीआई दस्तावेज़ सही ढंग से पढ़ रहा हूं।

(http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext)

0

यह त्रुटि जहां उपयोगकर्ता पहुँच

+1

यह मुझे लगता है टिप्पणी होना चाहिए। – ketan

+0

आप किस त्रुटि के बारे में बात कर रहे हैं? – mtoto

0

चिंगारी किस संस्करण प्रयोग कर रहे है फ़ोल्डर में hive.exec.scratchdir सेट करके किया जा सकता है?

यह उत्तर डेटा फ्रेम का उपयोग कर 1.6 & पर आधारित है।

val sc = new SparkContext(conf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

import sqlContext.implicits._ 
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt") 

    import org.apache.spark.sql.functions._ 
    client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show() 


+-----+---+-----+ 
|Categ|Sum|count| 
+-----+---+-----+ 
| A| 15| 2| 
| B| 56| 1| 
+-----+---+-----+ 

आशा है कि इससे मदद मिलती है !!

संबंधित मुद्दे