2016-06-04 8 views
10

मेरे डाटासेट इस तरह दिखता है: एक विशिष्ट घटना के लिएकैसे स्पार्क एसक्यूएल में समय अंतराल द्वारा समूह के लिए

"मीट्रिक का योग:

KEY |Event_Type | metric | Time 
001 |event1  | 10  | 2016-05-01 10:50:51 
002 |event2  | 100 | 2016-05-01 10:50:53 
001 |event3  | 20  | 2016-05-01 10:50:55 
001 |event1  | 15  | 2016-05-01 10:51:50 
003 |event1  | 13  | 2016-05-01 10:55:30 
001 |event2  | 12  | 2016-05-01 10:57:00 
001 |event3  | 11  | 2016-05-01 11:00:01 

मैं जब कुंजी है कि इस की पुष्टि सभी प्राप्त करना चाहते हैं ">थ्रेसहोल्डके दौरान 5 मिनट

यह मुझे स्लाइडिंग विंडोज फ़ंक्शंस का उपयोग करने के लिए एक आदर्श उम्मीदवार दिखाई देता है।

स्पार्क एसक्यूएल के साथ मैं यह कैसे कर सकता हूं?

धन्यवाद।

1) रूपांतरण (नक्शा, mapPartitions आदि) समय मूल्य YYYY-MM-DD-hh-मिमी जहां मिमी 5 मिनट स्तर पर लुढ़का हुआ है के रूप में:

उत्तर

13

स्पार्क> = 2.0

आप का उपयोग ca window (होने के लिए नहीं खिड़की के कार्यों के साथ गलत)। एक प्रकार यह टाइमस्टैम्प प्रदान करती है पर निर्भर करता है, एक और संभावित रूप से अतिव्यापी बाल्टी:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") 

// +---+---------------------------------------------+-----------+ 
// |KEY|window          |sum(metric)| 
// +---+---------------------------------------------+-----------+ 
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45   | 
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12   | 
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13   | 
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11   | 
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100  | 
// +---+---------------------------------------------+-----------+ 

स्पार्क < 2,0

उदाहरण डेटा के साथ शुरू की सुविधा देता है:

import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0 

val df = Seq(
    ("001", "event1", 10, "2016-05-01 10:50:51"), 
    ("002", "event2", 100, "2016-05-01 10:50:53"), 
    ("001", "event3", 20, "2016-05-01 10:50:55"), 
    ("001", "event1", 15, "2016-05-01 10:51:50"), 
    ("003", "event1", 13, "2016-05-01 10:55:30"), 
    ("001", "event2", 12, "2016-05-01 10:57:00"), 
    ("001", "event3", 11, "2016-05-01 11:00:01") 
).toDF("KEY", "Event_Type", "metric", "Time") 

मुझे लगता है कि घटना है KEY द्वारा पहचाना गया। यदि ऐसा नहीं है तो आप अपनी आवश्यकताओं के अनुसार GROUP BY/PARTITION BY क्लॉज समायोजित कर सकते हैं।

आप डेटा के स्थिर खिड़की स्वतंत्र साथ एक एकत्रीकरण में रुचि रखते हैं numerics को timestamps बदलने और दौर

import org.apache.spark.sql.functions.{round, sum} 

// cast string to timestamp 
val ts = $"Time".cast("timestamp").cast("long") 

// Round to 300 seconds interval 
val interval = (round(ts/300L) * 300.0).cast("timestamp").alias("interval") 

df.groupBy($"KEY", interval).sum("metric") 

// +---+---------------------+-----------+ 
// |KEY|interval    |sum(metric)| 
// +---+---------------------+-----------+ 
// |001|2016-05-01 11:00:00.0|11   | 
// |001|2016-05-01 10:55:00.0|12   | 
// |001|2016-05-01 10:50:00.0|45   | 
// |003|2016-05-01 10:55:00.0|13   | 
// |002|2016-05-01 10:50:00.0|100  | 
// +---+---------------------+-----------+ 

आप वर्तमान पंक्ति उपयोग खिड़की कार्यों के लिए एक खिड़की के सापेक्ष में रुचि रखते हैं:

import org.apache.spark.sql.expressions.Window 

// Partition by KEY 
// Order by timestamp 
// Consider window of -150 seconds to + 150 seconds relative to the current row 
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150) 
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w)) 

// +---+----------+------+-------------------+----------+----------+ 
// |KEY|Event_Type|metric|Time    |ts  |window_sum| 
// +---+----------+------+-------------------+----------+----------+ 
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13  | 
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45  | 
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45  | 
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45  | 
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12  | 
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11  | 
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100  | 
// +---+----------+------+-------------------+----------+----------+ 

प्रदर्शन कारणों से यह दृष्टिकोण केवल तभी उपयोगी होता है जब डेटा एकाधिक अलग-अलग समूहों में विभाजित हो। स्पार्क < 2.0.0 में आपको इसे काम करने के लिए HiveContext की भी आवश्यकता होगी।

+0

नमस्ते मैं जावा का उपयोग कर रहा हूं जावा और स्पार्क में समान संचालन कैसे करें 2.1.0 – sathiyarajan

+0

@ सथियाराजन मामूली वाक्यविन्यास मतभेदों को छोड़कर, काफी समान होना चाहिए। – zero323

0
स्थिर सीमा आप निम्न कर सकते के लिए

। जैसे 01, 02, 03, 05 05 हो जाता है; 16,17,18,19,20 हो जाता है 20

2) EVENT_TYPE और समय के साथ GroupBy या reduceBy प्रदर्शन करना और मीट्रिक पर अपने एकत्रीकरण (योग) प्रदर्शन

3) मैट्रिक्स> 5

फिल्टर करने के लिए फिल्टर परिवर्तन प्रदर्शन

आप स्पार्क आरडीडी या डेटाफ्रेम (एसक्यूएल) में लगभग उसी तरह लिख सकते हैं।

अन्य प्रकार की सीमा के लिए 00-05, 01-06, 02-07 आपको स्लाइडिंग विंडो की अवधारणा को देखने का प्रयास करना चाहिए। अपने डेटा को घूस उपयोग के मामले स्ट्रीमिंग का पैटर्न फिट बैठता है तो एपीआई स्ट्रीमिंग स्पार्क सही नहीं होंगे अन्यथा आप इस तरह कस्टम समाधान पा सकते हैं: Apache Spark - Dealing with Sliding Windows on Temporal RDDs

संबंधित मुद्दे