2016-12-09 13 views
5

में आरडीडी के लिए समान विभाजनकर्ता आवंटित करना हम किसी भी नेटवर्क यातायात से बचने के लिए आरडीडी के लिए एक ही निष्पादक और उसी विभाजनकर्ता को आवंटित करने की कोशिश कर रहे हैं और साथ ही शोगल ऑपरेशन जैसे कॉग्रुप और जॉइन में कोई चरण सीमा नहीं है और सभी परिवर्तन एक चरण के तहत पूरा हो जाते हैं ।स्पार्क

तो इस लक्ष्य को हासिल करने के लिए हम जावा में हमारे कस्टम RDD वर्ग (ExtendRDD.class) RDD.class से एक ओवरराइड getPreferredLocation समारोह है कि (स्केला में) के रूप में साथ RDD लपेट:

public Seq<String> getPreferredLocations(Partition split){ 
     listString.add("11.113.57.142"); 
     listString.add("11.113.57.163"); 
     listString.add("11.113.57.150"); 
     List<String> finalList = new ArrayList<String>(); 
     finalList.add(listString.get(split.index() % listString.size()));    

     Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq(); 

     return toReturnListString; 
    } 
इस हम साथ

स्पार्क के व्यवहार को नियंत्रित करने में सक्षम हैं क्योंकि यह नोड जिसने आरडीडी को क्लस्टर में रखा है। लेकिन अब समस्या यह है कि, चूंकि विभाजनकर्ता इन आरडीडी अलग होने के लिए, स्पार्क उन्हें निर्भर करता है कि वे शफल हो जाएं और फिर इन शफल संचालन के लिए कई चरणों को तैयार करें।

public Option<Partitioner> partitioner() { 
     Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner()); 
     return optionPartitioner; 
    } 

चिंगारी के लिए उन्हें एक ही मंच के नीचे डाल करने के लिए यह इन RDDs ही विभाजक से आ रही पर विचार करना चाहिए: हम के रूप में ही कस्टम RDD में एक ही RDD.class की विभाजक विधि ओवरराइड करने के लिए कोशिश की। हमारी विभाजन विधि काम नहीं कर रही है क्योंकि स्पार्क 2 आरडीडी के लिए अलग विभाजनकर्ता लेता है और शफल संचालन के लिए कई चरणों को बनाता है।

हम के रूप में हमारे कस्टम RDD साथ स्केला RDD लिपटे:

ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class); 
RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1); 
ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag); 

हम समान तरीके से एक और कस्टम RDD बना सकते हैं और है कि RDD के बाहर एक PairRDD (pairRDD2) मिलता है।

RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl()); 
PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null); 
pair.partitionBy(extendRDD2.getPartitioner()); 
pair.cogroup(pairRDD2); 

सभी इस काम करने के लिए के रूप में चिंगारी कई चरणों से उत्पन्न होने वाला यह cogroup परिवर्तन का सामना करना पड़ता प्रतीत नहीं होता: तो फिर हम PairRDDFunction वस्तु को extendRDD वस्तु के रूप में partitionBy फ़ंक्शन का उपयोग करें और फिर उस के लिए cogroup लागू ही विभाजक लागू करने का प्रयास ।

कोई सुझाव है कि हम आरडीडी में एक ही विभाजनकर्ता को कैसे लागू कर सकते हैं?

+0

आप हैश विभाजन या रेंज विभाजन का उपयोग कर रहे हैं –

+0

हैशपार्टिशनिंग –

उत्तर

0

मैं अपने सभी परिचालनों के लिए एक एकल चरण सफलतापूर्वक करने में सक्षम था। enter image description here