2017-04-18 11 views
6

में गति में शामिल हों मेरे पास दो डेटाफ्रेम हैं ए और बी ए बड़ा है (100 ग्राम) और बी अपेक्षाकृत छोटा (100 एम) है। ए की विभाजन संख्या 8 है और बी की विभाजन संख्या 1.प्रसारण में सुधार कैसे करें स्पार्क

A.join(broadcast(B), $"cur" >= $"low" && $"cur" <= $"high", "left_outer") 

गति बहुत धीमी है (> 10 घंटे)।

लेकिन अगर मैं करने के लिए शर्त में शामिल होने के बदलने के लिए:

A.join(broadcast(B), $"cur" === $"low" , "left_outer") 

यह बहुत तेज (< 30 मिनट) हो जाता है। लेकिन स्थिति को बदला नहीं जा सकता है।

तो क्या मेरी मूल स्थिति में शामिल होने की गति में और सुधार करने के कोई तरीके हैं?

+0

वे "लंबे" प्रकार हैं – derek

उत्तर

7

चाल join स्थिति को फिर से लिखना है, इसलिए इसमें = घटक शामिल है जिसका उपयोग क्वेरी को अनुकूलित करने और संभावित मैचों को कम करने के लिए किया जा सकता है। संख्यात्मक मूल्यों के लिए आप अपने डेटा को बाल्टीकृत करते हैं और स्थिति में शामिल होने के लिए बाल्टी का उपयोग करते हैं।

मान लें कि आपका डेटा इस तरह दिखता है:

val a = spark.range(100000) 
    .withColumn("cur", (rand(1) * 1000).cast("bigint")) 

val b = spark.range(100) 
    .withColumn("low", (rand(42) * 1000).cast("bigint")) 
    .withColumn("high", ($"low" + rand(-42) * 10).cast("bigint")) 

पहले एक बाल्टी आकार अपने डेटा के लिए उपयुक्त हो। इस मामले में हम 50 का उपयोग कर सकते हैं:

val bucketSize = 50L 
a से प्रत्येक पंक्ति के लिए

असाइन बाल्टी:

val aBucketed = a.withColumn(
    "bucket", ($"cur"/bucketSize).cast("bigint") * bucketSize 
) 

बनाएं यूडीएफ जो श्रेणी के लिए बकेट फेंकना होगा:

def get_buckets(bucketSize: Long) = 
    udf((low: Long, high: Long) => { 
    val min = (low/bucketSize) * bucketSize 
    val max = (high/bucketSize) * bucketSize 
    (min to max by bucketSize).toSeq 
    }) 

और बाल्टी b :

val bBucketed = b.withColumn(
    "bucket", explode(get_buckets(bucketSize)($"low", $"high")) 
) 

join हालत में उपयोग बाल्टी:

aBucketed.join(
    broadcast(bBucketed), 
    aBucketed("bucket") === bBucketed("bucket") && 
    $"cur" >= $"low" && 
    $"cur" <= $"high", 
    "leftouter" 
) 

इस तरह स्पार्क BroadcastHashJoin का उपयोग करेगा:

*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L)) 
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double)/50.0) as bigint) * 50) AS bucket#184L] 
: +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L] 
:  +- *Range (0, 100000, step=1, splits=Some(8)) 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false])) 
    +- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L] 
     +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L] 
     +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L] 
      +- *Range (0, 100, step=1, splits=Some(8)) 
बजाय

BroadcastNestedLoopJoin:

== Physical Plan == 
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L)) 
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L] 
: +- *Range (0, 100000, step=1, splits=Some(8)) 
+- BroadcastExchange IdentityBroadcastMode 
    +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L] 
     +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L] 
     +- *Range (0, 100, step=1, splits=Some(8)) 

आप धुन बाल्टी आकार परिशुद्धता और के बीच संतुलन के लिए कर सकते हैं डेटा आकार

आप एक निचले स्तर समाधान तो broadcast निरंतर आइटम का उपयोग के साथ एक हल कर अनुक्रम (जैसे Array या Vector) कोई आपत्ति नहीं है और शामिल होने के लिए द्विआधारी खोज के साथ udf का उपयोग करें।

आपको विभाजनों की संख्या भी देखना चाहिए। 100 जीबी के लिए 8 विभाजन बहुत कम लगता है।

संबंधित मुद्दे