2015-09-18 19 views
6

मैं दूसरे के विरुद्ध एक dataframe फिल्टर करने के लिए कोशिश कर रहा हूँ:कैसे एक और dataframe के खिलाफ एक चिंगारी dataframe फिल्टर करने के लिए

scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id") 
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id") 

अब मैं DF1 को फ़िल्टर और वापस एक dataframe कि जहां user_id DF1 में सभी पंक्तियों को शामिल करना चाहते हैं df2 में है ("valid_id")। दूसरे शब्दों में, मैं DF1 में सभी पंक्तियों को चाहते हैं, जहां user_id या तो 2,3,4,5 या 6

scala> df1.select("user_id").filter($"user_id" in df2("valid_id")) 
warning: there were 1 deprecation warning(s); re-run with -deprecation for details 
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20); 

दूसरी ओर जब मैं एक समारोह के खिलाफ एक फिल्टर करने की कोशिश पर है, सब कुछ बहुत अच्छा लगता है :

scala> df1.select("user_id").filter(($"user_id" % 2) === 0) 
res1: org.apache.spark.sql.DataFrame = [user_id: int] 

मुझे यह त्रुटि क्यों मिल रही है? क्या मेरे वाक्यविन्यास में कुछ गड़बड़ है?

निम्नलिखित टिप्पणी मैं एक छोड़ दिया बाहरी करने की कोशिश की है में शामिल होने:

scala> df1.show 
+-------+------------------+-------+ 
| name|    score|user_id| 
+-------+------------------+-------+ 
| user 1|    0.123|  1| 
| user 2|    0.246|  2| 
| user 3|    0.369|  3| 
| user 4|    0.492|  4| 
| user 5|    0.615|  5| 
| user 6|    0.738|  6| 
| user 7|    0.861|  7| 
| user 8|    0.984|  8| 
| user 9|    1.107|  9| 
|user 10|    1.23|  10| 
|user 11|    1.353|  11| 
|user 12|    1.476|  12| 
|user 13|    1.599|  13| 
|user 14|    1.722|  14| 
|user 15|    1.845|  15| 
|user 16|    1.968|  16| 
|user 17|    2.091|  17| 
|user 18|    2.214|  18| 
|user 19|2.3369999999999997|  19| 
|user 20|    2.46|  20| 
+-------+------------------+-------+ 
only showing top 20 rows 

scala> df2.show 
+--------+ 
|valid_id| 
+--------+ 
|  2| 
|  3| 
|  4| 
|  5| 
|  6| 
+--------+ 

scala> df1.join(df2, df1("user_id") === df2("valid_id")) 
res6: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int] 
scala> res6.collect 
res7: Array[org.apache.spark.sql.Row] = Array() 

scala> df1.join(df2, df1("user_id") === df2("valid_id"), "left_outer") 
res8: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int] 
scala> res8.count 
res9: Long = 0 

मैं चिंगारी 1.5.0 चल रहा हूँ स्केला साथ 2.10.5

+0

आप दो डेटाफ्रेम पर एक फ़िल्टर को फ़िल्टर या निष्पादित करना चाहते हैं? – eliasah

+0

@eliasah मैं df1 से पंक्तियों के उप-समूह के साथ डेटाफ्रेम प्राप्त करना चाहता हूं। डीएफ 1 में प्रत्येक पंक्ति आर के लिए, यदि आर ("user_id") का मान df2 ("valid_id") में है, तो पंक्ति आर परिणाम डेटाफ्रेम में शामिल किया जाएगा। – polo

+0

तब आपको उपयोगकर्ता आईडी == वैध आईडी – eliasah

उत्तर

11

आप एक (नियमित) भीतरी में शामिल होने, नहीं चाहता एक बाहरी शामिल :)

df1.join(df2, df1("user_id") === df2("valid_id")) 
+0

प्रश्न में एक उदाहरण जोड़ूंगा निश्चित रूप से! माफ करना मेरा बुरा! अब मुझे पता है कि अनिद्रा के साथ SO पर जाना अच्छा नहीं है :) – eliasah

+0

@ ग्लेनी-हेल्स-सिंडोल्ट: आपके उत्तर के लिए धन्यवाद। यह समझ में आता है, लेकिन एक खाली डेटा फ्रेम देता है। प्रश्न में कोड उदाहरण के साथ मेरे संपादन देखें। – polo

+0

@polo मुझे यह कहना है कि सबकुछ सही लगता है, जहां तक ​​मैं देख सकता हूं। मैंने अभी आपके आदेशों को अपने स्वयं के खोल में कॉपी किया है (स्पार्क 1.5.0 भी चला रहा है) और सब कुछ पूरी तरह से काम करता है। आप परिवर्तन से नहीं बदलते हैं कुछ स्पष्ट 'वैल एससी = नया स्पार्ककॉन्टेक्स्ट (conf)' कहीं भी आपके खोल में, क्या आप? मैंने हाल ही में किसी और के साथ ठोकर खाई जिसने अजीब व्यवहार देखा क्योंकि उसने अपना स्वयं का एससी-चर घोषित कर दिया था। अन्यथा, मुझे लगता है कि मैं विचारों से ताजा हूं क्योंकि मैं बस समस्या को पुन: उत्पन्न नहीं कर सकता। मुझे लगता है कि आपने अपना खोल फिर से लॉन्च करने का प्रयास किया है? –

संबंधित मुद्दे