2015-10-19 6 views
9

मेरे पास डेटा के साथ स्पार्क एसक्यूएल DataFrame है और जो मैं प्राप्त करने का प्रयास कर रहा हूं वह सभी पंक्तियों को किसी दिए गए दिनांक सीमा में वर्तमान पंक्ति से पहले है। तो उदाहरण के लिए मैं दी गई पंक्ति से 7 दिन पहले सभी पंक्तियां रखना चाहता हूं। मैं पता लगा मैं एक Window Function की तरह उपयोग करने की आवश्यकता:स्पार्क विंडो फ़ंक्शंस - रेंज के बीच

Window \ 
    .partitionBy('id') \ 
    .orderBy('start') 

और यहाँ समस्या आती है। मैं 7 दिनों के लिए चाहता हूं, लेकिन स्पार्क दस्तावेज़ों में कुछ भी नहीं है जो मुझे मिल सकता है। क्या स्पार्क भी ऐसा विकल्प प्रदान करता है? अभी के लिए मैं सिर्फ साथ सभी पूर्ववर्ती पंक्तियों हो रही है:

.rowsBetween(-sys.maxsize, 0) 

लेकिन जैसे कुछ प्राप्त करने के लिए चाहते हैं: अगर किसी को भी इस एक मैं बहुत आभारी होंगे पर मेरी मदद कर सकता है

.rangeBetween("7 days", 0) 

। अग्रिम में धन्यवाद!

उत्तर

21

जहां तक ​​मुझे पता है कि यह सीधे संभव नहीं है न तो स्पार्क और न ही हाइव में। दोनों को ORDER BY क्लॉज की आवश्यकता होती है जो RANGE के साथ संख्यात्मक होने के लिए उपयोग की जाती है। मुझे मिली सबसे नज़दीकी चीज़ टाइमस्टैम्प में रूपांतरण और सेकंड पर चल रही है। बहुत से

w = (Window() 
    .partitionBy(col("id")) 
    .orderBy(col("start").cast("timestamp").cast("long")) 
    .rangeBetween(-days(7), 0)) 

df.select(col("*"), mean("some_value").over(w).alias("mean")).show() 

## +---+----------+----------+------------------+ 
## | id|  start|some_value|    mean| 
## +---+----------+----------+------------------+ 
## | 1|2015-01-01|  20.0|    20.0| 
## | 1|2015-01-06|  10.0|    15.0| 
## | 1|2015-01-07|  25.0|18.333333333333332| 
## | 1|2015-01-12|  30.0|21.666666666666668| 
## | 2|2015-01-01|  5.0|    5.0| 
## | 2|2015-01-03|  30.0|    17.5| 
## | 2|2015-02-01|  20.0|    20.0| 
## +---+----------+----------+------------------+ 

सुदूर लेकिन काम करता है:

from pyspark.sql import Row 

row = Row("id", "start", "some_value") 
df = sc.parallelize([ 
    row(1, "2015-01-01", 20.0), 
    row(1, "2015-01-06", 10.0), 
    row(1, "2015-01-07", 25.0), 
    row(1, "2015-01-12", 30.0), 
    row(2, "2015-01-01", 5.0), 
    row(2, "2015-01-03", 30.0), 
    row(2, "2015-02-01", 20.0) 
]).toDF().withColumn("start", col("start").cast("date")) 

एक छोटा सा सहायक और खिड़की परिभाषा:

from pyspark.sql.window import Window 
from pyspark.sql.functions import mean, col 


# Hive timestamp is interpreted as UNIX timestamp in seconds* 
days = lambda i: i * 86400 

अंत में क्वेरी start स्तंभ मान लिया जाये कि date प्रकार शामिल है।


* Hive Language Manual, Types

+0

धन्यवाद, मैं कुछ इसी तरह, यह पुष्टि की है करने के लिए अच्छा के बारे में सोच रहा था! – Nhor

संबंधित मुद्दे