2015-02-09 7 views
7

कुंजी के द्वारा मैं क्रियान्वित किया है और प्रत्येक समूह (K, RDD[V]), partitionBy और Partitioner का उपयोग कर के अनुसार डेटा की गणना करने के लिए PartitionBy का उपयोग करना। फिर भी, मुझे यकीन नहीं है कि यह वास्तव में कुशल है और मैं आपका दृष्टिकोण देखना चाहता हूं।विभाजित और कुशलता से समूह <code>RDD[K, V]</code> के लिए एक समाधान RDD समूहों की गणना के अनुसार मुख्य

यहां नमूने मामला है: [K: Int, V: Int] की एक सूची के अनुसार, गणना V रों K के प्रत्येक समूह के लिए मतलब है, जानते हुए भी कि यह वितरित किया जाना चाहिए और V मूल्यों बहुत बड़ी हो सकती है। यही कारण है कि देना चाहिए:

List[K, V] => (K, mean(V)) 

सरल विभाजनर वर्ग:

class MyPartitioner(maxKey: Int) extends Partitioner { 

    def numPartitions = maxKey 

    def getPartition(key: Any): Int = key match { 
     case i: Int if i < maxKey => i 
    } 
    } 

विभाजन कोड:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) 

     val rdd = sc.parallelize(l) 
     val p = rdd.partitionBy(new MyPartitioner(4)).cache() 

     p.foreachPartition(x => { 
     try { 
      val r = sc.parallelize(x.toList) 
      val id = r.first() //get the K partition id 
      val v = r.map(x => x._2) 
      println(id._1 + "->" + mean(v)) 
     } catch { 
      case e: UnsupportedOperationException => 0 
     } 
     }) 

उत्पादन होता है:

1->13, 2->4, 3->7

मेरे प्रश्न हैं:

  1. partitionBy पर कॉल करते समय वास्तव में क्या होता है? (क्षमा करें, मुझे उस पर पर्याप्त चश्मा नहीं मिला)
  2. क्या यह विभाजन द्वारा मैप करना वास्तव में कुशल है, यह जानकर कि मेरे उत्पादन मामले में यह बहुत अधिक मूल्यों (नमूना के लिए 50 के रूप में) बहुत अधिक मूल्यों से नहीं होगा (जैसा कि नमूना के लिए 1 मिलियन)
  3. paralellize(x.toList) की लागत क्या है? क्या यह करने के लिए लगातार है? (मुझे mean() के इनपुट में RDD की आवश्यकता है)
  4. आप इसे अपने आप कैसे करेंगे?

सादर

उत्तर

4

आपका कोड काम नहीं करना चाहिए। आप निष्पादकों को SparkContext ऑब्जेक्ट पास नहीं कर सकते हैं। (यह Serializable नहीं है।) इसके अलावा मुझे नहीं पता कि आपको क्यों आवश्यकता होगी।

माध्य की गणना करने के लिए, आपको योग और गणना की गणना करने और अपना अनुपात लेने की आवश्यकता है। डिफ़ॉल्ट विभाजनकर्ता ठीक करेगा।

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { 
    case class SumCount(sum: Double, count: Double) 
    val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), 
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) 
    sumCounts.map(sc => sc.sum/sc.count) 
} 

यह एक कुशल एकल-पास गणना है जो अच्छी तरह से सामान्यीकृत होती है।

+0

आपके उत्तर के लिए धन्यवाद, बेशक यह काम नहीं कर सकता है, मेरे पास स्पार्क कोडिंग चाल का सभी प्रतिबिंब नहीं है और मैं अपने स्थानीय जेवीएम द्वारा खराब हो गया हूं। फिर भी, वास्तव में मुझे माध्य की गणना करने की आवश्यकता नहीं है, लेकिन एक जटिल एमएल विधि है, और मुझे एक आरडीडी [वेक्टर] की आवश्यकता है। मैं एक अद्वितीय आरडीडी [Int, Int] से (कुंजी, आरडीडी [वेक्टर]) की सूची कैसे प्राप्त कर सकता हूं? मुझे कोई समाधान नहीं मिला। – Seb

+0

मुझे लगता है कि यह एक समान विषय है: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 मुझे यकीन नहीं है कि आप कैसे 'वेक्टर' बनाना चाहते हैं 'Int's। लेकिन यदि आप एक आरडीडी प्रति कुंजी प्राप्त करना चाहते हैं, तो आपको मूल आरडीडी को विभाजित करने की आवश्यकता है, और इस पर लिंक किए गए उत्तर में चर्चा की गई है। यदि यह आपको जवाब नहीं देता है, तो मैं सुझाव देता हूं कि आप एक और प्रश्न पूछें, शायद आप जो करना चाहते हैं उसके स्पष्ट, उच्च स्तरीय स्पष्टीकरण के साथ। –

संबंधित मुद्दे