2016-02-05 15 views
5

में डेटाफ्रेम में शामिल होने के दौरान प्रसारण नहीं हो रहा है नीचे नमूना कोड है जो मैं चला रहा हूं। जब यह स्पार्क जॉब चलता है, तो डेटाफ्रेम जुड़ता है ब्रॉडकास्ट के बजाए sortmergejoin का उपयोग कर हो रहा है।स्पार्क 1.6

def joinedDf (sqlContext: SQLContext, 
       txnTable: DataFrame, 
       countriesDfBroadcast: Broadcast[DataFrame]): 
       DataFrame = { 
        txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"), 
        $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
       } 
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp") 

ब्रॉडकास्ट जॉइन तब भी नहीं हो रहा है जब मैं शामिल कथन में प्रसारण() संकेत निर्दिष्ट करता हूं।

ऑप्टिमाइज़र डेटाफ्रेम को हैशपाइटिशन कर रहा है और यह डेटा स्कू का कारण बन रहा है।

क्या किसी ने इस व्यवहार को देखा है?

मैं स्पार्क 1.6 और हाइव कॉन्टेक्स्ट का उपयोग कर यार्नकॉन्टेक्स्ट के रूप में यार्न पर इसे चला रहा हूं। स्पार्क नौकरी 200 निष्पादकों पर चलती है। और txnTable का डेटा आकार 240 जीबी है और देशों का डेटासाइज डीएफ 5 एमबी है।

उत्तर

7

आप जिस तरह से DataFrame प्रसारित करते हैं और आप इसे कैसे एक्सेस करते हैं, गलत हैं।

  • वितरित डेटा संरचनाओं को संभालने के लिए मानक प्रसारण का उपयोग नहीं किया जा सकता है। आप broadcast कार्य करता है जो प्रसारण के लिए DataFrame दिया निशान का उपयोग करना चाहिए आप प्रसारण एक DataFrame पर शामिल होने के लिए प्रदर्शन करना चाहते हैं:

    import org.apache.spark.sql.functions.broadcast 
    
    val countriesDf: DataFrame = ??? 
    val tmp: DataFrame = broadcast(
        countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries") 
    ) 
    
    txnTable.as("df1").join(
        broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
    

    आंतरिक यह collecttmp बाद में आंतरिक और प्रसारण से परिवर्तित किए बिना होगा।

  • तर्कों में शामिल होने का उत्सुकता से मूल्यांकन किया जाता है। यहां तक ​​कि SparkContext.broadcast का उपयोग करना संभव था, वितरित डेटा संरचना प्रसारण मूल्य का मूल्यांकन join से पहले स्थानीय रूप से किया जाता है। Thats 'क्यों आपका काम बिल्कुल काम करता है लेकिन प्रसारण में शामिल नहीं होता है।

+0

अब, मैं ब्रॉडकास्ट हैशजोइन को एक रन में और SortMergeJoin को किसी अन्य रन में देख रहा हूं। (एक ही कोड, अलग डेटा सेट)। –

+0

मेरा अनुमान है कि प्रसारण में शामिल होने के लिए यह आकार सीमा से अधिक है। – zero323

+0

मेरे पास बहुत अधिक स्पार्क.sql.autoBroadcastJoinThreshold है। लगभग। 1GB। और प्रसारण की गई फाइल लगभग 5 एमबी है। हालांकि, दूसरे भाग में, उपर्युक्त सिफारिश महान काम करती है। –