2015-07-13 4 views
8

मेरे पास दो आरडीडी हैं। एक आरडीडी 5-10 मिलियन प्रविष्टियों के बीच है और अन्य आरडीडी 500 मिलियन से 750 मिलियन प्रविष्टियों के बीच है। किसी बिंदु पर, मुझे एक आम कुंजी का उपयोग करके इन दो rdds में शामिल होना है।स्पार्क में एक बहुत बड़ी रड में मैं कुशलता से बड़े पैमाने पर कैसे शामिल हो सकता हूं?

val rddA = someData.rdd.map { x => (x.key, x); } // 10-million 
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million 
var joinRDD = rddA.join(rddB); 

जब स्पार्क इस में शामिल होने का निर्णय लेता है, तो यह एक शफल होशजोइन करने का निर्णय लेता है। इससे नेटवर्क पर rddB में कई आइटमों को घुमाया जा सकता है। इसी तरह, कुछ rddA भी नेटवर्क पर shuffled हैं। इस मामले में, आरडीएए प्रसारण चर के रूप में उपयोग करने के लिए बहुत "बड़ा" है, लेकिन ऐसा लगता है कि ब्रॉडकास्टशॉजइन अधिक कुशल होगा। क्या BroadcastHashJoin का उपयोग करने के लिए स्पार्क करने के लिए संकेत है? (अपाचे फ्लिंक संकेतों में शामिल होने के माध्यम से इसका समर्थन करता है)।

यदि नहीं, तो ऑटोबॉडकास्ट जॉइन थ्रेसहोल्ड बढ़ाने का एकमात्र विकल्प है?

अद्यतन 7/14

मेरे प्रदर्शन मुद्दा वर्गाकार repartitioning में रूट किया गया है। आम तौर पर, एचडीएफएस से पढ़ा गया एक आरडीडी ब्लॉक द्वारा विभाजित किया जाएगा, लेकिन इस मामले में, स्रोत एक लकड़ी का डेटासेट था [जो मैंने बनाया]। जब स्पार्क (डाटाबेस) लकड़ी की छत फ़ाइल लिखता है, तो यह प्रति विभाजन एक फ़ाइल लिखता है, और समान रूप से, यह प्रति फ़ाइल एक विभाजन को पढ़ता है। इसलिए, मैंने पाया है कि सबसे अच्छा जवाब यह है कि डेटासोर्स के उत्पादन के दौरान, इसे कुंजी द्वारा विभाजित करने के लिए, लकड़ी की छिद्र सिंक (जिसे स्वाभाविक रूप से सह-विभाजित किया जाता है) लिखें और इसे rddB के रूप में उपयोग करें।

दिया गया उत्तर सही है, लेकिन मुझे लगता है कि लकड़ी के डेटा स्रोत के बारे में विवरण किसी और के लिए उपयोगी हो सकता है।

उत्तर

16

आप आरडीडी को एक ही विभाजनकर्ता के साथ विभाजित कर सकते हैं, इस मामले में एक ही कुंजी वाले विभाजन को उसी निष्पादक पर कॉल किया जाएगा।

इस मामले में आप संचालन में शामिल होने के लिए शफल से बचेंगे। जब आप parititoner अपडेट कर देंगे

घसीटना, केवल एक बार होता है, और अगर आप कैश करेंगे RDD सभी मिलती है कि उसके बाद निष्पादकों

import org.apache.spark.SparkContext._ 

class A 
class B 

val rddA: RDD[(String, A)] = ??? 
val rddB: RDD[(String, B)] = ??? 

val partitioner = new HashPartitioner(1000) 

rddA.partitionBy(partitioner).cache() 
rddB.partitionBy(partitioner).cache() 

इसके अलावा, आप प्रसारण सीमा आकार अद्यतन करने के लिए कोशिश कर सकते हैं करने के लिए स्थानीय होना चाहिए, शायद rddA प्रसारण किया जा सकता है:

--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb 

हम प्रसारण में शामिल होने के लिए 400 एमबी का उपयोग करते हैं, और यह अच्छी तरह से काम करता है।

+0

मुझे डर था कि आप जो कहने जा रहे थे। मैंने पहले ही विभाजन का उपयोग करने की कोशिश की है - और आप मूल रूप से जुर्माना लगाते हैं। दुर्भाग्यवश, मैं आरडीडी से एक फ़ाइल अपस्ट्रीम को "पढ़ता हूं" और वास्तव में विभाजित संरचना में पढ़ने के लिए एक अच्छा तरीका नहीं है, इसलिए मुझे पढ़ने के बाद विभाजन करना होगा। मैंने ऑटोबॉडकास्ट जॉइन थ्रेसहोल्ड के साथ खेला है - इसलिए मुझे पता है कि यह काम करता है, मैं बस इतना पसंद नहीं करता। जैसा कि मैंने ओपी में कहा था, यह एक ऐसा क्षेत्र है जहां फ्लिंक नियंत्रण प्रदान करता है जिसे मैं स्पार्क करता हूं। प्रतिक्रिया के लिए धन्यवाद। – Ajaxx

+2

मुझे समझ थी कि --conf spark.sql.autoBroadcastJoinThreshold केवल डेटाफ्रेम या डेटासेट (स्पार्क एसक्यूएल) के बीच जुड़ने पर लागू होता है। क्या आरडीडी में शामिल होने के लिए भी इसका इस्तेमाल किया जाता है? धन्यवाद। – leo9r

संबंधित मुद्दे