स्पार्क> = 2.0
आप का उपयोग ca window
(होने के लिए नहीं खिड़की के कार्यों के साथ गलत)। एक प्रकार यह टाइमस्टैम्प प्रदान करती है पर निर्भर करता है, एक और संभावित रूप से अतिव्यापी बाल्टी:
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+
स्पार्क < 2,0
उदाहरण डेटा के साथ शुरू की सुविधा देता है:
import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0
val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")
मुझे लगता है कि घटना है KEY
द्वारा पहचाना गया। यदि ऐसा नहीं है तो आप अपनी आवश्यकताओं के अनुसार GROUP BY
/PARTITION BY
क्लॉज समायोजित कर सकते हैं।
आप डेटा के स्थिर खिड़की स्वतंत्र साथ एक एकत्रीकरण में रुचि रखते हैं numerics को timestamps बदलने और दौर
import org.apache.spark.sql.functions.{round, sum}
// cast string to timestamp
val ts = $"Time".cast("timestamp").cast("long")
// Round to 300 seconds interval
val interval = (round(ts/300L) * 300.0).cast("timestamp").alias("interval")
df.groupBy($"KEY", interval).sum("metric")
// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+
आप वर्तमान पंक्ति उपयोग खिड़की कार्यों के लिए एक खिड़की के सापेक्ष में रुचि रखते हैं:
import org.apache.spark.sql.expressions.Window
// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))
// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+
प्रदर्शन कारणों से यह दृष्टिकोण केवल तभी उपयोगी होता है जब डेटा एकाधिक अलग-अलग समूहों में विभाजित हो। स्पार्क < 2.0.0 में आपको इसे काम करने के लिए HiveContext
की भी आवश्यकता होगी।
नमस्ते मैं जावा का उपयोग कर रहा हूं जावा और स्पार्क में समान संचालन कैसे करें 2.1.0 – sathiyarajan
@ सथियाराजन मामूली वाक्यविन्यास मतभेदों को छोड़कर, काफी समान होना चाहिए। – zero323