2015-01-27 7 views
5

के रूप में स्किला ऑब्जेक्ट का उपयोग करके कम करें। मैं स्कैला के साथ स्पार्क का उपयोग कर रहा हूं और मेरे पास एक जटिल वस्तु है जिसमें एक जटिल वस्तु है जिसमें कुंजी और एक डबल है। यदि उद्देश्य समान है तो लक्ष्य डबल (आवृत्ति) जोड़ना है।स्कैला ऑब्जेक्ट को कुंजी

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{ 
     def compare(that: SimpleCoocurrence) = { 
     if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos) 
      &&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos)) 
      0 
     else 
      this.toString.compareTo(that.toString) 
     } 
    } 

अब मुझे लगता है कि जैसे reduceBykey उपयोग करने के लिए कोशिश कर रहा हूँ:

है कि मैं इस प्रकार मेरी वस्तु द्वारा निर्दिष्ट किए गए

लेकिन

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(coocRDD.count) 

, परिणाम से पता चलता है कि RDD से पहले और एक lowbykey प्रसंस्करण के बाद तत्वों की एक ही संख्या में शामिल हैं।

मैं tuple2 [SimpleCoocurrence, Double] का उपयोग करके कम से कम कैसे कर सकता हूं? स्पार्ड को मेरी ऑब्जेक्ट्स की तुलना करने के तरीके को बताने का अच्छा तरीका ऑर्डर किया गया है? क्या मुझे केवल tuple2 [स्ट्रिंग, डबल] का उपयोग करना चाहिए?

THX,

उत्तर

5

reduceByKey निर्धारित करने के लिए कुंजी क्या एक ही हैं आदेश लेकिन hashCode और equals उपयोग नहीं करता। विशेष रूप से, hashPartitioner हैश द्वारा समूह कुंजी, उसी हैश कोड के साथ सोथैट कुंजी उसी विभाजन पर सोथैट को प्रति विभाजन पर और कमी हो सकती है।

केस क्लास में equals और hashCode का डिफ़ॉल्ट कार्यान्वयन है। संभवतः उपयोग किए गए परीक्षण डेटा में distance:Double फ़ील्ड के विभिन्न मान होते हैं जो प्रत्येक उदाहरण को एक अद्वितीय वस्तु बनाते हैं। इसे कुंजी के रूप में उपयोग करने के परिणामस्वरूप केवल समान वस्तुओं को एक के रूप में कम किया जा रहा है।

एक तरीका यह पता करने के लिए अपने case class और वस्तु के लिए एक अतिरिक्त विधि, कुछ इस तरह के लिए एक महत्वपूर्ण परिभाषित करने के लिए किया जाएगा:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable { 
    val key = word + word_pos + cooc + cooc_pos 
} 
object SimpleCoocurrence { 
    val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ??? 
} 

val coocList:List[SimpleCoocurrence] = ??? 
val coocRDD = sc.parallelize(coocList) 
val coocByKey = coocRDD.keyBy(_.key) 
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add) 

(*) उदाहरण मार्गदर्शक के रूप में प्रदान कोड - संकलित या परीक्षण नहीं ।

+0

https://issues.apache.org/jira/browse/SPARK-10493 – yanghaogn

0

पहले, मैं गूंगा हूँ ...

इसके बाद, मामले में किसी को भी एक ही समस्या है और स्पार्क पर एक reduceByKey के लिए कुंजी के रूप में जटिल स्केला वस्तुओं का उपयोग करना चाहते:

स्पार्क जानता है कि कैसे तुलना करने के लिए दो वस्तुएं भले ही वे आदेश लागू नहीं करते हैं। तो उपरोक्त कोड वास्तव में fonctionnal है।

एकमात्र समस्या यह थी कि मैं पहले और बाद में एक ही आरडीडी प्रिंट कर रहा था। जब मैं इसे लिखता हूं, तो यह वास्तव में अच्छी तरह से काम करता है।

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
val newRDD = coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(newRDD.count) 
0

आप reduceByKey के परिणामों भंडारण नहीं कर रहे हैं। इसके बजाय इसे आजमाएं:

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
val result = coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(result.count) 
संबंधित मुद्दे