2015-09-07 5 views
8

मेरे पास 2 कॉलम के साथ डेटा फ्रेम है: टाइमस्टैम्प, मान टाइमस्टैम्प एक समय है जब युग और मान एक फ्लोट मान है। मैं न्यूनतम मूल्यों पर पंक्तियों को न्यूनतम मानों में विलय करना चाहता हूं। इसका मतलब है कि मैं उन सभी पंक्तियों को लेना चाहता हूं जहां टाइमस्टैम्प एक ही दौर के मिनट (युग के बाद 60 सेकंड अंतराल) से है और उन्हें एक पंक्ति में विलय करें, जहां मूल्य कॉलम सभी मानों का अर्थ होगा।स्पार्क डेटाफ्रेम में एक पंक्ति में एकाधिक पंक्तियों को विलय करना

एक उदाहरण देने के लिए, की सुविधा देता है मान लेते हैं कि मेरी dataframe इस तरह दिखता है:

timestamp  value 
---------  ----- 
1441637160  10.0 
1441637170  20.0 
1441637180  30.0 
1441637210  40.0 
1441637220  10.0 
1441637230  0.0 

पहले 4 पंक्तियाँ एक ही मिनट का हिस्सा हैं (, १४४१६३७१६० 60% == 0 +१४४१६३७१६० + 60 == +१४४१६३७२२०) अंतिम 2 पंक्तियां एक और मिनट का हिस्सा हैं। मैं उसी मिनट की सभी पंक्तियों को मर्ज करना चाहता हूं। परिणाम देखने के लिए:

timestamp  value 
---------  ----- 
1441637160  25.0 (since (10+20+30+40)/4 = 25) 
1441637220  5.0 (since (10+0)/2 = 5) 

ऐसा करने का सबसे अच्छा तरीका क्या है?

उत्तर

5

आप बस समूह और समेकित कर सकते हैं।

val df = sc.parallelize(Seq(
    (1441637160, 10.0), 
    (1441637170, 20.0), 
    (1441637180, 30.0), 
    (1441637210, 40.0), 
    (1441637220, 10.0), 
    (1441637230, 0.0))).toDF("timestamp", "value") 

आयात आवश्यक कार्यों और वर्गों:

import org.apache.spark.sql.functions.{lit, floor} 
import org.apache.spark.sql.types.IntegerType 

अंतराल स्तंभ बनाने:

val tsGroup = (floor($"timestamp"/lit(60)) * lit(60)) 
    .cast(IntegerType) 
    .alias("timestamp") 

और इसका इस्तेमाल एकत्रीकरण प्रदर्शन करने के लिए:

df.groupBy(tsGroup).agg(mean($"value").alias("value")).show 

// +----------+-----+ 
// | timestamp|value| 
// +----------+-----+ 
// |1441637160| 25.0| 
// |1441637220| 5.0| 
// +----------+-----+ 
के रूप में डेटा के साथ
1

पहले मिनट की बाल्टी के लिए टाइमस्टैम्प मानचित्र करें, फिर औसत गणना करने के लिए समूहबीकी का उपयोग करें। उदाहरण के लिए:

rdd.map(x=>{val round = x._1%60; (x._1-round, x._2);}) 
.groupByKey 
.map(x=>(x._1, (x._2.sum.toDouble/x._2.size))) 
.collect() 
संबंधित मुद्दे