2015-10-13 11 views
6

मैं एक कठिन समय कुछ है कि लगता है को लागू करने की तरह यह बहुत आसान होना चाहिए हो रही है:प्रदर्शन देखने/का उपयोग कर एक स्पार्क RDD या डेटा फ्रेम में अनुवाद एक और RDD/df

मेरा लक्ष्य एक RDD में अनुवाद करने के लिए है एक लुकअप टेबल या अनुवाद शब्दकोश के रूप में एक दूसरे आरडीडी/डेटा फ्रेम का उपयोग कर डेटाफ्रेम। मैं इन अनुवादों को कई कॉलम में बनाना चाहता हूं।

समस्या की व्याख्या करने का सबसे आसान तरीका उदाहरण के अनुसार है।

Route SourceCityID DestinationCityID 
A  1   2 
B  1   3 
C  2   1 

और

CityID CityName 
1  London 
2  Paris 
3  Tokyo 

मेरे वांछित आउटपुट RDD है:

Route SourceCity DestinationCity 
A  London  Paris 
B  London  Tokyo 
C  Paris  London 

मैं इसे यह उत्पादन के बारे में कैसे जाना चाहिए चलो कहते हैं कि मैं अपने इनपुट के रूप में निम्नलिखित दो RDDs करते हैं?

यह एसक्यूएल में एक आसान समस्या है, लेकिन मुझे स्पार्क में आरडीडी के साथ स्पष्ट समाधानों की जानकारी नहीं है। , cogroup में शामिल हों, आदि विधियां मल्टी-कॉलम आरडीडी के लिए उपयुक्त नहीं हैं और यह निर्दिष्ट करने की अनुमति नहीं देते कि कौन से कॉलम में शामिल होना है।

कोई विचार? क्या SQLContext उत्तर है?

+0

डेटाफ्रेम और स्पार्कएसक्यूएल का उपयोग करने से आप जो भी ढूंढ रहे हैं उसकी मदद करेंगे। यह मूल रूप से एक अलग वाक्यविन्यास के साथ एसक्यूएल है। – eliasah

+0

टेबल/आरडीडी के आकार क्या हैं? क्या सिटीआईडी ​​/ सिटीनाम आरडीडी रूट आरडीडी से कई गुना छोटा है? उस स्थिति में मैं एक मानचित्र के रूप में आरडीडी के परिणाम एकत्रित करता हूं और इसे प्रसारित करता हूं, ताकि यह प्रत्येक कर्मचारी पर स्थानीय लुकअप हो। –

उत्तर

3

RDD रास्ता:

routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ]) 
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")]) 


print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \ 
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \ 
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect() 

कौन सा प्रिंट:

[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')] 

और SQLContext रास्ता:

from pyspark.sql import HiveContext 
from pyspark.sql import SQLContext 

df_routes = sqlContext.createDataFrame(\ 
routes, ["Route", "SourceCityID", "DestinationCityID"]) 
df_cities = sqlContext.createDataFrame(\ 
cities, ["CityID", "CityName"]) 

temp = df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \ 
.select("Route", "DestinationCityID", "CityName") 
.withColumnRenamed("CityName", "SourceCity") 

print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \ 
.select("Route", "SourceCity", "CityName") 
.withColumnRenamed("CityName", "DestinationCity").collect() 

कौन सा प्रिंट:

[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'), 
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'), 
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')] 
3

मान लिया जाये कि हम मार्गों और शहरों के साथ दो RDDs है:

val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1))) 
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo"))) 

वहाँ शहरों देखने को लागू करने के कई तरीके हैं। यह मानते हुए कि शहरों के लुकअप में मार्गों की तुलना में कुछ आइटम शामिल हैं जिनमें कई वस्तुएं होंगी। उस स्थिति में शहरों को इकट्ठा करने के साथ शुरू करें जो ड्राइवर द्वारा प्रत्येक कार्य में भेजा जाता है।

val citiesByID = citiesByIDRDD.collectAsMap 

routes.map{r => (r._1, citiesByID(r._2), citiesByID(r._3))}.collect 
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London)) 

हर काम करने के लिए लुकअप तालिका भेजने से बचने के लिए, लेकिन केवल एक बार श्रमिकों के लिए आप मौजूदा कोड प्रसारण देखने नक्शा विस्तार कर सकते हैं।

val bCitiesByID = sc.broadcast(citiesByID) 

routes.map{r => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3))}.collect 
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London)) 

मैं यहाँ डेटा फ्रेम के लिए की जरूरत नहीं दिख रहा है, लेकिन अगर आप चाहते हैं, तो आप:

import sqlContext.implicits._ 

case class Route(id: String, from: Int, to: Int) 
case class City(id: Int, name: String) 

val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo")) 
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1)) 

val citiesDf = cities.df 
citiesDf.registerTempTable("cities") 
val routesDf = routes.df 
citiesDf.registerTempTable("routes") 

routesDf.show 
+---+----+---+ 
| id|from| to| 
+---+----+---+ 
| A| 1| 2| 
| B| 1| 3| 
| C| 2| 1| 
+---+----+---+ 

citiesDf.show 
+---+------+ 
| id| name| 
+---+------+ 
| 1|London| 
| 2| Paris| 
| 3| Tokyo| 
+---+------+ 

आपने कहा है कि यह SQL में एक आसान समस्या है मैं इतना मान लीजिए कि आप इसे यहां से ले जा सकते हैं।निष्पादित एसक्यूएल इस प्रकार है:

sqlContext.sql ("SELECT COUNT(*) FROM routes") 
+0

संग्रहों का उपयोग करने के लिए आपको यह सुनिश्चित करना होगा कि सभी डेटा मास्टर नोड –

+1

में फिट होंगे हां, यह सही है। सवाल का नमूना एकत्रित और प्रसारण का उपयोग करके इस समाधान के लिए संकेत देता है, लेकिन यह केवल तभी समझ में आता है जब शहरों की लुकअप टेबल अपेक्षाकृत छोटी होती है और निष्पादकों/ड्राइवर स्मृति में फिट होने के लिए काफी छोटी होती है। –

संबंधित मुद्दे