2015-09-04 5 views
9

विवरण

एक dataframe dfस्पार्क और स्पार्कएसक्यूएल: विंडो फ़ंक्शन का अनुकरण कैसे करें?

id |  date 
--------------- 
1 | 2015-09-01 
2 | 2015-09-01 
1 | 2015-09-03 
1 | 2015-09-04 
2 | 2015-09-04 

मैं एक चल काउंटर या सूचकांक बनाना चाहते हैं को देखते हुए,

  • एक ही आईडी के आधार पर वर्गीकृत और
  • उस समूह में तिथि के अनुसार छाँटे,

इस प्रकार

id |  date | counter 
-------------------------- 
1 | 2015-09-01 |  1 
1 | 2015-09-03 |  2 
1 | 2015-09-04 |  3 
2 | 2015-09-01 |  1 
2 | 2015-09-04 |  2 

यह कुछ है जो मैं विंडो फ़ंक्शन के साथ प्राप्त कर सकता हूं, उदा।

val w = Window.partitionBy("id").orderBy("date") 
val resultDF = df.select(df("id"), rowNumber().over(w)) 

दुर्भाग्य से, स्पार्क 1.4.1 नियमित dataframes के लिए खिड़की कार्यों का समर्थन नहीं करता:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext; 

प्रश्न

  • मैं वर्तमान स्पार्क 1.4.1 पर ऊपर गणना कैसे प्राप्त कर सकते बिना खिड़की के कार्यों का उपयोग कर?
  • स्पार्क में नियमित डेटाफ्रेम के लिए विंडो फ़ंक्शन कब समर्थित होंगे?

धन्यवाद!

+0

क्या आपको डेटाफ्रेम और एसक्यूएल का उपयोग करने की आवश्यकता है, या आप आरडीडी का उपयोग कर सकते हैं? यह समूह द्वारा विधि के साथ काफी सरल है। –

+0

@ किर्कबॉडहर्स्ट: आरडीडी भी ठीक रहेगा। क्या आप अपने विचार को थोड़ा कोड अंश के साथ स्केच कर सकते हैं? स्पार्कएसक्यूएल के रूप में मैं वर्तमान में ऐसा करने का तरीका नहीं देखता हूं: क्या आपके पास कोई विचार है? –

उत्तर

6

आप आरडीडी के साथ ऐसा कर सकते हैं। व्यक्तिगत रूप से मुझे लगता है कि आरडीडी के लिए एपीआई बहुत अधिक समझ में आता है - मैं हमेशा नहीं चाहता कि मेरा डेटा डेटाफ्रेम की तरह 'फ्लैट' हो।

Array[List[org.apache.spark.sql.Row]] = 
Array(
    List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
    List([2,2015-09-01], [2,2015-09-04])) 

आप 'समूह' के भीतर की स्थिति के रूप में अच्छी तरह से चाहते हैं, आप zipWithIndex उपयोग कर सकते हैं:

val df = sqlContext.sql("select 1, '2015-09-01'" 
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'") 
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'") 
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'") 
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'")) 

// dataframe as an RDD (of Row objects) 
df.rdd 
    // grouping by the first column of the row 
    .groupBy(r => r(0)) 
    // map each group - an Iterable[Row] - to a list and sort by the second column 
    .map(g => g._2.toList.sortBy(row => row(1).toString))  
    .collect() 

ऊपर निम्नलिखित की तरह एक परिणाम देता है।

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect() 

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
    List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)), 
    List(([2,2015-09-01],0), ([2,2015-09-04],1))) 

आप FlatMap का उपयोग कर वस्तुओं की एक सरल सूची/सरणी को यह वापस समतल सकता है, लेकिन अगर आपको लगता है कि एक बहुत अच्छा विचार नहीं किया जाएगा 'समूह' पर कुछ भी प्रदर्शन करने की जरूरत है।

इस तरह आरडीडी का उपयोग करने का नकारात्मक पक्ष यह है कि डेटाफ्रेम से आरडीडी में परिवर्तित करने और फिर से वापस करने के लिए यह कठिन है।

+0

बहुत धन्यवाद !!! वह समाधान था, मैं देख रहा था। हम्म, मैं 'समूह' टाइप करने के बाद, नियमित स्कैला 'सूची' संचालन करने के लिए पर्याप्त "बहादुर" नहीं था .... –

+0

क्या होगा जब मेरी "g._2.toList.sortBy" सूची में लाखों तत्वों के, मैं उन्हें इकट्ठा नहीं कर सकता – halil

7

आप स्थानीय DataFrames के लिए HiveContext का भी उपयोग कर सकते हैं और, जब तक कि आपके पास बहुत अच्छा कारण न हो, वैसे भी यह शायद एक अच्छा विचार है। यह एक SQLContextspark-shell और pyspark शैल में उपलब्ध है (अब के लिए sparkR सादा SQLContext का उपयोग करने लगता है) और इसके पार्सर को Spark SQL and DataFrame Guide द्वारा अनुशंसित किया जाता है।

import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.rowNumber 

object HiveContextTest { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Hive Context") 
    val sc = new SparkContext(conf) 
    val sqlContext = new HiveContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(
     ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil 
    ).toDF("k", "v") 

    val w = Window.partitionBy($"k").orderBy($"v") 
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show 
    } 
} 
3

मैं पूरी तरह से सहमत हूं कि डेटाफ्रेम के लिए विंडो फ़ंक्शन आपके पास स्पार्क संस्करण (> =) 1.5 है। लेकिन अगर आप वास्तव में पुराने संस्करण से फंस गए हैं (उदा। 1.4।1), यहाँ इस

val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil) 
      .toDF("id", "date") 

val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup") 
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup") 
         .where($"date"<=$"dateDup") 
         .groupBy($"id", $"date") 
         .agg($"id", $"date", count($"idDup").as("counter")) 
         .select($"id",$"date",$"counter") 

हल करने के लिए एक hacky तरह से अब अगर आप कर dfWithCounter.show

आप मिल जाएगा है:

+---+----------+-------+               
| id|  date|counter| 
+---+----------+-------+ 
| 1|2015-09-01|  1| 
| 1|2015-09-04|  3| 
| 1|2015-09-03|  2| 
| 2|2015-09-01|  1| 
| 2|2015-09-04|  2| 
+---+----------+-------+ 

ध्यान दें कि date पृथक नहीं किया जा रहा है, लेकिन counter सही है । where कथन में <= से >= को बदलकर आप counter के ऑर्डरिंग को बदल सकते हैं।

संबंधित मुद्दे