7

मैं एक स्पार्क निम्नलिखित संरचना के साथ 2.0 dataframe example है:स्पार्क डेटाफ्रेम: क्या समूह क्रम के बाद वह आदेश बनाए रखता है?

id, hour, count 
id1, 0, 12 
id1, 1, 55 
.. 
id1, 23, 44 
id2, 0, 12 
id2, 1, 89 
.. 
id2, 23, 34 
etc. 

यह प्रत्येक आईडी (दिन के प्रत्येक घंटे के लिए एक) के लिए 24 प्रविष्टियों शामिल और orderBy समारोह का उपयोग कर, आईडी द्वारा आदेश दिया है घंटे।

def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable { 
    override def zero: String = "" 

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat) 

    override def merge(b1: String, b2: String) = b1 + b2 

    override def finish(b: String) = b.substring(1) 

    override def bufferEncoder: Encoder[String] = Encoders.STRING 

    override def outputEncoder: Encoder[String] = Encoders.STRING 
    }.toColumn 

यह मेरे तार में कॉलम जोड़ इस अंतिम dataframe प्राप्त करने के लिए मदद करता है::

id, hourly_count 
id1, 12:55:..:44 
id2, 12:89:..:34 
etc. 

मेरे सवाल है, अगर मैं example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count") करते हैं, कि गारंटी करता है

मैं एक एग्रीगेटर groupConcat बनाया है कि प्रति घंटा की गणना उनके संबंधित बाल्टी में सही ढंग से आदेश दिया जाएगा?

मैंने पढ़ा है कि यह आरडीडी के लिए आवश्यक नहीं है (Spark sort by key and then group by to get ordered iterable? देखें), लेकिन शायद यह डेटाफ्रेम के लिए अलग है?

यदि नहीं, तो मैं इसके आसपास कैसे काम कर सकता हूं?

उत्तर

3

संक्षिप्त उत्तर हाँ है, प्रति घंटा गणना उसी क्रम को बनाए रखेगी।

सामान्यीकृत करने के लिए, यह महत्वपूर्ण है कि आप समूह से पहले सॉर्ट करें। इसके अलावा यह समूह समूह + कॉलम जैसा ही होना चाहिए जिसके लिए आप वास्तव में सॉर्टिंग चाहते हैं।

एक उदाहरण की तरह:

employees 
    .sort("company_id", "department_id", "employee_role") 
    .groupBy("company_id", "department_id") 
    .agg(Aggregators.groupConcat(":", 2) as "count_per_role") 
+1

आप किसी भी संदर्भ बताते हुए है समूह द्वारा आदेश जारी रखता है? मुझे आधिकारिक दस्तावेज़ –

+0

में कुछ भी नहीं मिला, मेरे पास आधिकारिक दस्तावेज़ नहीं हैं, लेकिन मेरे पास यह आलेख है जो तंत्र को थोड़ा बेहतर बताता है https://bzhangusc.wordpress.com/2015/05/28/groupby-on -डेटाफ्रेम-है-द-ग्रुपबी-ऑन-आरडीडी/टिप्पणियां भी दिलचस्प हैं। – Interfector

+1

दिलचस्प है, यहां तक ​​कि शॉन ओवेन खुद कहा गया है कि आदेश को संरक्षित नहीं किया जा सकता है (https://issues.apache.org/jira/browse/SPARK-16207?focusedCommentId=15356725&page=com.atlassian.jira.plugin.system.issuetabpanels% 3Acomment-tabpanel # टिप्पणी-15,356,725) –

1

मैं एक मामले में जहां आदेश हमेशा नहीं रखा जाता है: कभी कभी हाँ, ज्यादातर नहीं।

मेरे dataframe स्पार्क पर चल रहे 200 विभाजन है 1,6

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                F.sort_array(F.collect_list(times)), 
                F.collect_list(times) 
                  ) 

आदेश जाँच करने के लिए मैं जैसे दे

F.sort_array(F.collect_list(times)) 

और

F.collect_list(times) 

की वापसी मूल्यों की तुलना (बाएं: sort_array (collect_list()); सही: collect_list())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000 
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000 
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000 
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000 
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000 
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000 
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000 
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000 
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000 
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000 
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000 
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000 
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000 
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000 

बाएँ स्तंभ हमेशा की तरह, सॉर्ट हो जाता है, जबकि सही स्तंभ केवल क्रमबद्ध ब्लॉकों के होते हैं। लेने() के विभिन्न निष्पादन के लिए, दाएं कॉलम में ब्लॉक का क्रम अलग है।

+0

स्वीकार किए जाते हैं जवाब में कहा गया है कि आप दोनों स्तंभ आप हल कर के रूप में भी दिखाई देने वाले कॉलम के साथ समूह चाहते हैं, यानी द्वारा सॉर्ट करने के लिए की जरूरत है 'orderBy (बार, group_key) .groupBy (group_key)'। क्या आपने कोशिश की? – Shaido

0

विभाजन की संख्या और डेटा के वितरण के आधार पर ऑर्डर हो सकता है या नहीं भी हो सकता है। हम खुद को आरडीडी का उपयोग कर हल कर सकते हैं।

उदाहरण ::

के लिए मैं एक फ़ाइल में नीचे नमूना डेटा को बचाया और HDFS में यह भरी हुई।

1,type1,300 
2,type1,100 
3,type2,400 
4,type2,500 
5,type1,400 
6,type3,560 
7,type2,200 
8,type3,800 

और नीचे कमांड निष्पादित:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect() 

उत्पादन:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4)) 

है यही कारण है कि, हम प्रकार के आधार पर डेटा वर्गीकृत किया, उसके बाद मूल्य के अनुसार क्रमबद्ध, और साथ आईडी concatenated विभाजक के रूप में "~"। उपरोक्त आदेश तोड़ा जा सकता है नीचे के रूप में:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3) 

val groupedData=validData.groupBy(_(1)) //group data rdds 

val sortedJoinedData=groupedData.mapValues(x=>{ 
    val list=x.toList 
    val sortedList=list.sortBy(_(2)) 
    val idOnlyList=sortedList.map(_(0)) 
    idOnlyList.mkString("~") 
} 
) 
sortedJoinedData.collect() 

हम तो कमांड का उपयोग करके एक विशेष समूह के ले जा सकते हैं

sortedJoinedData.filter(_._1=="type1").collect() 

उत्पादन:

Array[(String, String)] = Array((type1,2~1~5)) 
संबंधित मुद्दे