2016-09-12 7 views
6

मेरे पास है RDD [पंक्ति]:स्कैला: स्ट्रिंग मानों के लिए GroupBy योग कैसे करें?

|---itemId----|----Country-------|---Type----------| 
    |  11  |  US   |  Movie  | 
    |  11  |  US   |  TV   | 
    |  101  |  France  |  Movie  |  

GroupBy Itemid कैसे करना है, जहां प्रत्येक पंक्ति अलग json वस्तु (RDD में प्रत्येक पंक्ति) है, ताकि मैं json की सूची के रूप में परिणाम बचा सकते हैं:

{"itemId" : 11, 
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} }, 
{"itemId" : 101, 
"Country": {"France" :1 },"Type": {"Movie" :1} } 

RDD:

मैंने कोशिश की:

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../"  
val input = sc.textFile(mappingPath) 

इनपुट की सूची है jsons जहाँ प्रत्येक पंक्ति json है जो मैं MappingUtils जो JSON पार्स करने और रूपांतरण का ख्याल रखता है का उपयोग कर CountryInfo POJO वर्ग के लिए मानचित्रण हूँ:

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 


val events = sqlContext.sql("select itemId EventList") 

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val çountryInfo = showTitleInfo(MappingsList.get(itemId)); 
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry() 
    val type = countryInfo.getType() 

    Row(itemId, country, type) 
     }) 

कुछ भी मुझे पता है कि मैं यह कैसे प्राप्त कर सकते हैं कर सकते हैं?

धन्यवाद!

+0

क्या आरडीडी [पंक्ति] डेटाफ्रेम/डेटासेट से आया था? आरडीडी [पंक्ति] के साथ काम करना आम तौर पर आदर्श नहीं है हालांकि अभी भी करने योग्य है। –

+0

मैंने डेटासेट से आरडीडी बनाया है। –

+0

@ASpotySpot मेरे आरडीडी के साथ अपडेट किया गया –

उत्तर

3

मैं इसे पूरा करने के लिए अतिरिक्त समय बर्दाश्त नहीं कर सकता, लेकिन आपको शुरुआत कर सकता हूं।

विचार यह है कि आप RDD[Row] को एक ही मानचित्र में विभाजित करते हैं जो आपके JSON संरचना का प्रतिनिधित्व करता है।

  1. seqOp कैसे लक्ष्य प्रकार
  2. combOp कैसे लक्ष्य प्रकार के दो विलय करने में तत्वों का एक संग्रह गुना करने के लिए: एकत्रीकरण एक गुना है कि दो फ़ंक्शन पैरामीटर की आवश्यकता है।

मुश्किल हिस्सा है, जबकि विलय के रूप में आप seqOp में देखा मूल्यों की गिनती जमा करने के लिए की जरूरत है combOp में आता है। मैंने इसे एक अभ्यास के रूप में छोड़ दिया है, क्योंकि मेरे पास पकड़ने का विमान है! उम्मीद है कि अगर आपको परेशानी हो तो कोई और अंतराल भर सकता है।

case class Row(id: Int, country: String, tpe: String) 

    def foo: Unit = { 

    val rows: RDD[Row] = ??? 

    def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = { 
     acc.get(r.id) match { 
     case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1))) 
     case Some((countries, types)) => 
      val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1) 
      val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1) 
      acc.updated(r.id, (countries_, types_)) 
     } 
    } 

    val z = Map.empty[Int, (Map[String, Int], Map[String, Int])] 

    def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = { 
     l.foldLeft(z) { case (acc, (id, (countries, types))) => 
      r.get(id) match { 
      case None => acc.updated(id, (countries, types)) 
      case Some(otherCountries, otherTypes) => 
       // todo - continue by merging countries with otherCountries 
       // and types with otherTypes, then update acc 
      } 
     } 
    } 

    val summaryMap = rows.aggregate(z) { seqOp, combOp } 
संबंधित मुद्दे