2016-06-16 9 views
12

को देखते हुए दो स्पार्क डेटासेट, ए और बी मैं इस प्रकार किसी एकल स्तंभ पर शामिल होने के कर सकते हैं:एकाधिक कॉलम पर डेटासेट में कैसे शामिल हों?

a.joinWith(b, $"a.col" === $"b.col", "left") 

मेरा प्रश्न है कि क्या आप एक एकाधिक स्तंभों का उपयोग करके शामिल कर सकते हैं। वास्तव में Dataframe साथ के रूप में एक ही तरह से

a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left") 

उत्तर

12

तुम कर सकते हो: अनिवार्य रूप से निम्नलिखित DataFrames एपीआई कोड के बराबर

val xs = Seq(("a", "foo", 2.0), ("x", "bar", -1.0)).toDS 
val ys = Seq(("a", "foo", 2.0), ("y", "bar", 1.0)).toDS 

xs.joinWith(ys, xs("_1") === ys("_1") && xs("_2") === ys("_2"), "left").show 
// +------------+-----------+ 
// |   _1|   _2| 
// +------------+-----------+ 
// | [a,foo,2.0]|[a,foo,2.0]| 
// |[x,bar,-1.0]|  null| 
// +------------+-----------+ 

स्पार्क < 2.0.0 में आप कुछ इस तरह उपयोग कर सकते हैं :

xs.as("xs").joinWith(
    ys.as("ys"), ($"xs._1" === $"ys._1") && ($"xs._2" === $"ys._2"), "left") 
7

where चेनिंग करके एक दूसरे के बाद जुड़ने का एक और तरीका है। आप पहली बार, एक (उसके प्रकार और वैकल्पिक रूप से) में शामिल होने के निर्दिष्ट where ऑपरेटर (ऑपरेटरों) द्वारा पीछा यानी

scala> case class A(id: Long, name: String) 
defined class A 

scala> case class B(id: Long, name: String) 
defined class B 

scala> val as = Seq(A(0, "zero"), A(1, "one")).toDS 
as: org.apache.spark.sql.Dataset[A] = [id: bigint, name: string] 

scala> val bs = Seq(B(0, "zero"), B(1, "jeden")).toDS 
bs: org.apache.spark.sql.Dataset[B] = [id: bigint, name: string] 

scala> as.join(bs).where(as("id") === bs("id")).show 
+---+----+---+-----+ 
| id|name| id| name| 
+---+----+---+-----+ 
| 0|zero| 0| zero| 
| 1| one| 1|jeden| 
+---+----+---+-----+ 


scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).show 
+---+----+---+----+ 
| id|name| id|name| 
+---+----+---+----+ 
| 0|zero| 0|zero| 
+---+----+---+----+ 

इस तरह के एक goodie के लिए कारण यह है कि स्पार्क अनुकूलक एक में (कोई यमक इरादा) लगातार where रों में शामिल हो जाएगा है join के साथ। अंतर्निहित तार्किक और भौतिक योजनाओं को देखने के लिए explain ऑपरेटर का उपयोग करें।

scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).explain(extended = true) 
== Parsed Logical Plan == 
Filter (name#31 = name#36) 
+- Filter (id#30L = id#35L) 
    +- Join Inner 
     :- LocalRelation [id#30L, name#31] 
     +- LocalRelation [id#35L, name#36] 

== Analyzed Logical Plan == 
id: bigint, name: string, id: bigint, name: string 
Filter (name#31 = name#36) 
+- Filter (id#30L = id#35L) 
    +- Join Inner 
     :- LocalRelation [id#30L, name#31] 
     +- LocalRelation [id#35L, name#36] 

== Optimized Logical Plan == 
Join Inner, ((name#31 = name#36) && (id#30L = id#35L)) 
:- Filter isnotnull(name#31) 
: +- LocalRelation [id#30L, name#31] 
+- Filter isnotnull(name#36) 
    +- LocalRelation [id#35L, name#36] 

== Physical Plan == 
*BroadcastHashJoin [name#31, id#30L], [name#36, id#35L], Inner, BuildRight 
:- *Filter isnotnull(name#31) 
: +- LocalTableScan [id#30L, name#31] 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false], input[0, bigint, false])) 
    +- *Filter isnotnull(name#36) 
     +- LocalTableScan [id#35L, name#36] 
संबंधित मुद्दे