2015-12-08 11 views
5

मैं DataFrame निम्नलिखित है:स्पार्क: स्केला RDD में समूह concat बराबर

|-----id-------|----value------|-----desc------| 
    |  1  |  v1  |  d1  | 
    |  1  |  v2  |  d2  | 
    |  2  |  v21  |  d21  | 
    |  2  |  v22  |  d22  | 
    |--------------|---------------|---------------| 

मैं इसे में बदलना चाहते हैं:

|-----id-------|----value------|-----desc------| 
    |  1  |  v1;v2  |  d1;d2 | 
    |  2  |  v21;v22 |  d21;d22 | 
    |--------------|---------------|---------------| 
  • यह डेटा फ्रेम आपरेशन के माध्यम से संभव है?
  • इस मामले में rdd परिवर्तन कैसे दिखता है?

मुझे लगता है कि rdd.reduce कुंजी है, लेकिन मुझे नहीं पता कि इस परिदृश्य में इसे कैसे अनुकूलित किया जाए।

+0

आप 'स्ट्रिंग टाइप' या 'ऐरे टाइप टाइप' कॉलम के परिणामस्वरूप 'मान' कॉलम चाहते हैं? – Odomontois

+0

स्पार्क <1.6 में आप एक यूडीएफ़ का उपयोग कर सकते हैं: [MySQL GROUP_CONCAT कुल फ़ंक्शन के लिए SPARK SQL प्रतिस्थापन] (http://stackoverflow.com/a/32750733/1560062)। – zero323

उत्तर

7

आप चिंगारी एसक्यूएल का उपयोग कर

case class Test(id: Int, value: String, desc: String) 
val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22"))) 
    .map(line => Test(line._1, line._2, line._3)) 
    .df 

data.registerTempTable("data") 
val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id") 
result.show 
+1

दिलचस्प। मैं ['collect_list' को' @since 1.6.0'] के रूप में चिह्नित करता हूं (https://github.com/nburoojy/spark/blob/07de8a2f65b205b0d157301e097beb4950448cf0/sql/core/src/main/scala/org/apache/spark/sql /functions.scala#L185) – Odomontois

+2

अजीब, मैं स्पार्क 1.6.1 का उपयोग कर रहा हूँ! जब मैं वही कर रहा हूं तो यह कह रहा है: अपरिभाषित फ़ंक्शन collect_list; मैंने कार्यों को भी जोड़ा ._ आयात –

+0

क्या आप ** collect_list ** एसक्यूएल क्वेरी के अंदर या डेटाफ्रेम के साथ फ़ंक्शन का उपयोग कर रहे हैं? – Kaushal

1

मान लीजिए आप की तरह

import scala.util.Random 

val sqlc: SQLContext = ??? 

case class Record(id: Long, value: String, desc: String) 

val testData = for { 
    (i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5)) 
    } yield Record(i, s"v$i$j", s"d$i$j") 

val df = sqlc.createDataFrame(testData) 

कुछ आप आसानी से के रूप में डेटा शामिल हो सकते हैं हैं:

import sqlc.implicits._ 

def aggConcat(col: String) = df 
     .map(row => (row.getAs[Long]("id"), row.getAs[String](col))) 
     .aggregateByKey(Vector[String]())(_ :+ _, _ ++ _) 

val result = aggConcat("value").zip(aggConcat("desc")).map{ 
     case ((id, value), (_, desc)) => (id, value, desc) 
    }.toDF("id", "values", "descs") 

आप सरणियों के बजाय concatenated तार करना चाहते हैं, तो आप बाद

import org.apache.spark.sql.functions._ 

val resultConcat = result 
     .withColumn("values", concat_ws(";", $"values")) 
     .withColumn("descs" , concat_ws(";", $"descs")) 
चला सकते हैं
0

कुछ शोध के बाद मैं इस तरह sth के साथ आया है:

val data = sc.parallelize(
    List(
     ("1", "v1", "d1"), 
     ("1", "v2", "d2"), 
     ("2", "v21", "d21"), 
     ("2", "v22", "d22"))) 
     .map{ case(id, value, desc)=>((id), (value, desc))} 
     .reduceByKey((x,y)=>(x._1+";"+y._1, x._2+";"+x._2)) 
     .map{ case(id,(value, desc))=>(id, value, desc)}.toDF("id", "value","desc") 
     .show() 

है कि मेरे साथ छोड़ देता है:

+---+-------+-------+ 
    | id| value| desc| 
    +---+-------+-------+ 
    | 1| v1;v2| d1;d1| 
    | 2|v21;v22|d21;d21| 
    +---+-------+-------+ 
1

तो DataFrames साथ काम करना, UDAF

का उपयोग
import org.apache.spark.sql.Row 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} 

class ConcatStringsUDAF(InputColumnName: String, sep:String = ",") extends UserDefinedAggregateFunction { 
    def inputSchema: StructType = StructType(StructField(InputColumnName, StringType) :: Nil) 
    def bufferSchema: StructType = StructType(StructField("concatString", StringType) :: Nil) 
    def dataType: DataType = StringType 
    def deterministic: Boolean = true 
    def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = "" 

    private def concatStrings(str1: String, str2: String): String = { 
    (str1, str2) match { 
     case (s1: String, s2: String) => Seq(s1, s2).filter(_ != "").mkString(sep) 
     case (null, s: String) => s 
     case (s: String, null) => s 
     case _ => "" 
    } 
    } 
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val acc1 = buffer.getAs[String](0) 
    val acc2 = input.getAs[String](0) 
    buffer(0) = concatStrings(acc1, acc2) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    val acc1 = buffer1.getAs[String](0) 
    val acc2 = buffer2.getAs[String](0) 
    buffer1(0) = concatStrings(acc1, acc2) 
    } 

    def evaluate(buffer: Row): Any = buffer.getAs[String](0) 
} 

और फिर का उपयोग अपने डेटा बदल सकता है इस तरह

val stringConcatener = new ConcatStringsUDAF("Category_ID", ",") 
data.groupBy("aaid", "os_country").agg(stringConcatener(data("X")).as("Xs")) 

स्पार्क 1.6 के रूप में, डेटासेट और एग्रीगेटर पर एक नज़र डालें।

संबंधित मुद्दे