2014-10-06 13 views
12

में एक और RDD के आधार पर मैं केवल कर्मचारियों जो एक डिपामेंट आईडी दूसरी तालिका में संदर्भित है रखने के लिए करना चाहते हैं।फ़िल्टर स्पार्क

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
employee.filter(lambda e: e[1] in department).collect() 

Py4JError: An error occurred while calling o344.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 

कोई भी विचार:

Employee table 
LastName DepartmentID 
Rafferty 31 
Jones 33 
Heisenberg 33 
Robinson 34 
Smith 34 

Department table 
DepartmentID 
31 
33 

मैं निम्नलिखित कोड है जो काम नहीं करता है की कोशिश की? मैं पाइथन के साथ स्पार्क 1.1.0 का उपयोग कर रहा हूं। हालांकि, मैं एक स्कैला या पायथन जवाब स्वीकार करता हूं।

+0

आप एक होने के लिए अपने विभाग सूची की आवश्यकता है RDD? – maasg

+0

वास्तव में नहीं। विभाग सूची एचडीएफएस से लोड की गई है लेकिन यह बहुत बड़ी नहीं है। – poiuytrez

उत्तर

19

इस मामले में, आप क्या प्राप्त करने के लिए चाहते हैं विभाग तालिका में निहित डेटा के साथ प्रत्येक विभाजन में फिल्टर करने के लिए है:

val dept = deptRdd.collect.toSet 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)} 

अपने विभाग डेटा बड़ा है, तो एक प्रसारण चर प्रदर्शन सभी नोड्स के लिए एक बार डेटा पहुंचाने के बजाय प्रत्येक कार्य

val deptBC = sc.broadcast(deptRdd.collect.toSet) 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)} 
0 के साथ यह क्रमानुसार करने होने से सुधार होगा: यह बुनियादी समाधान होगा

हालांकि में शामिल होने के लिए काम करेंगे का उपयोग कर, यह एक बहुत महंगा समाधान है के रूप में यह डेटा की एक वितरित फेरबदल की आवश्यकता होगी (byKey) में शामिल होने को प्राप्त करने के। यह देखते हुए कि आवश्यकता एक साधारण फ़िल्टर है, प्रत्येक विभाजन को डेटा भेजना (ऊपर दिखाया गया है) बेहतर प्रदर्शन प्रदान करेगा।

+0

अगर मैं यहां गलत हूं, तो मुझे माफ़ कर दो, लेकिन विभाजन नहीं करेगा() कुंजी द्वारा वितरित शफल को हल करें? यह नहीं कह रहा कि यह अधिक महंगा होने में शामिल होने की समस्या को हल करेगा, क्योंकि मुझे नहीं लगता कि यह होगा, मैं बस यह कह रहा था कि इसमें शामिल होने के लिए 100% शफल की आवश्यकता नहीं है। – TurnipEntropy

10

मैं अंत में एक समाधान एक में शामिल होने का उपयोग कर कार्यान्वित किया। मैं विभाग को 0 मान जोड़ने के लिए स्पार्क से एक अपवाद से बचने के लिए किया था:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
# invert id and name to get id as the key 
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0])) 
# add a 0 value to avoid an exception 
department = sc.parallelize(department).map(lambda d: (d,0)) 

employee.join(department).map(lambda e: (e[1][0], e[0])).collect() 

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)] 
0

छनन एकाधिक स्तंभों में एकाधिक मान:

मामले में जहाँ आप एक डेटाबेस से डेटा खींच रहे (हाइव या इस उदाहरण के लिए एसक्यूएल प्रकार डाटाबेस) और, यह सिर्फ आसान हो सकता है एकाधिक स्तंभों पर फिल्टर करने के लिए की जरूरत है पहले फिल्टर के साथ तालिका लोड करने के लिए है, तो RDD के माध्यम से अपने फिल्टर पुनरावृति (कई छोटे पुनरावृत्तियों स्पार्क प्रोग्रामिंग के प्रोत्साहित तरीका है):

{ 
    import org.apache.spark.sql.hive.HiveContext 
    val hc = new HiveContext(sc) 

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)") 
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20") 
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500") 

} 
बेशक आप अपने डेटा पर फिल्टर करने के लिए एक छोटा सा जानने की आवश्यकता के

सही मूल्य, लेकिन यह विश्लेषण प्रक्रिया का हिस्सा है।

0

उपरोक्त एक ही एक्सएम के लिए, मैं केवल उन कर्मचारियों को रखना चाहता हूं जो दूसरी तालिका में संदर्भित एक विभाग आईडी में हैं या नहीं। लेकिन यह कोई आपरेशन में शामिल होने के किया जाना है, मैं में "निहित" या "में" यह देखने के लिए होगा, मेरा मतलब 33 है "में" 334 और 335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
संबंधित मुद्दे