2016-04-14 13 views
5

में उपयोगकर्ता परिभाषित एग्रेगेट फ़ंक्शन (यूडीएफ़) में म्यूटेबल नक्शा स्वचालित रूप से अपरिवर्तनीय क्यों हो जाता है, मैं स्पार्क में उपयोगकर्ता परिभाषित एग्रेगेट फ़ंक्शन (यूडीएफ़) को परिभाषित करने की कोशिश कर रहा हूं, जो समूह के कॉलम में प्रत्येक अद्वितीय मानों के लिए घटनाओं की संख्या की गणना करता है।स्पार्क

यह एक उदाहरण है: मान लीजिए मैं एक dataframe df इस तरह है,

+----+----+ 
|col1|col2| 
+----+----+ 
| a| a1| 
| a| a1| 
| a| a2| 
| b| b1| 
| b| b2| 
| b| b3| 
| b| b1| 
| b| b1| 
+----+----+ 

मैं एक UDAF DistinctValues ​​होगा

val func = new DistinctValues 

तब मैं

df dataframe पर लागू
val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV")) 

मुझे कुछ पसंद होने की उम्मीद है इस ई:

+----+--------------------------+ 
|col1|DV      | 
+----+--------------------------+ 
| a| Map(a1->2, a2->1)  | 
| b| Map(b1->3, b2->1, b3->1)| 
+----+--------------------------+ 

तो मैं इस तरह एक UDAF के साथ बाहर आया,

import org.apache.spark.sql.expressions.MutableAggregationBuffer 
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types.StructField 
import org.apache.spark.sql.types.DataType 
import org.apache.spark.sql.types.ArrayType 
import org.apache.spark.sql.types.StringType 
import org.apache.spark.sql.types.MapType 
import org.apache.spark.sql.types.LongType 
import Array._ 

class DistinctValues extends UserDefinedAggregateFunction { 
    def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil) 

    def dataType: DataType = MapType(StringType, LongType) 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = scala.collection.mutable.Map() 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) : Unit = { 
    val str = input.getAs[String](0) 
    var mp = buffer.getAs[scala.collection.mutable.Map[String, Long]](0) 
    var c:Long = mp.getOrElse(str, 0) 
    c = c + 1 
    mp.put(str, c) 
    buffer(0) = mp 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = { 
    var mp1 = buffer1.getAs[scala.collection.mutable.Map[String, Long]](0) 
    var mp2 = buffer2.getAs[scala.collection.mutable.Map[String, Long]](0) 
    mp2 foreach { 
     case (k ,v) => { 
      var c:Long = mp1.getOrElse(k, 0) 
      c = c + v 
      mp1.put(k ,c) 
     } 
    } 
    buffer1(0) = mp1 
    } 

    def evaluate(buffer: Row): Any = { 
     buffer.getAs[scala.collection.mutable.Map[String, LongType]](0) 
    } 
} 

तब मैं अपने dataframe पर इस समारोह है,

val func = new DistinctValues 
val agg_values = df.groupBy("col1").agg(func(col("col2")).as("DV")) 

यह त्रुटि दे दी है,

func: DistinctValues = [email protected] 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 884, ip-172-31-22-166.ec2.internal): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map 
at $iwC$$iwC$DistinctValues.update(<console>:39) 
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:431) 
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:187) 
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:180) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:116) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

ऐसा लगता है कि update(buffer: MutableAggregationBuffer, input: Row) विधि, चर buffer एक immutable.Map है, कार्यक्रम थक mutable.Map करने के लिए इसे कास्ट करने के लिए,

लेकिन मैं mutable.Map इस्तेमाल किया initialize(buffer: MutableAggregationBuffer, input:Row) विधि में buffer चर प्रारंभ करने में। क्या यह वही चर update विधि पर पारित किया गया है? और buffermutableAggregationBuffer है, तो यह उत्परिवर्तनीय होना चाहिए, है ना?

मेरा mutable.Map क्यों अपरिवर्तनीय हो गया? क्या कोई जानता है कि क्या हुआ?

मुझे कार्य को पूरा करने के लिए वास्तव में इस फ़ंक्शन में एक परिवर्तनीय मानचित्र की आवश्यकता है। मुझे पता है कि अपरिवर्तनीय मानचित्र से एक परिवर्तनीय मानचित्र बनाने के लिए एक कामकाज है, फिर इसे अपडेट करें। लेकिन मैं वास्तव में जानना चाहता हूं कि उत्परिवर्तनीय व्यक्ति प्रोग्राम में स्वचालित रूप से अपरिवर्तनीय क्यों बदलता है, यह मुझे समझ में नहीं आता है।

उत्तर

4

मान लें कि यह MapType है StructType में। buffer इसलिए Map है, जो अपरिवर्तनीय होगा।

आप इसे परिवर्तित कर सकते हैं, लेकिन क्यों तुम सिर्फ यह अपरिवर्तनीय छोड़ने के लिए और ऐसा नहीं करते हैं इस:

mp = mp + (k -> c) 

अपरिवर्तनीय Map के लिए एक प्रविष्टि जोड़ने के लिए? नीचे

कार्य उदाहरण:

class DistinctValues extends UserDefinedAggregateFunction { 
    def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("_2", IntegerType) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil) 

    def dataType: DataType = MapType(StringType, LongType) 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = Map() 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) : Unit = { 
    val str = input.getAs[String](0) 
    var mp = buffer.getAs[Map[String, Long]](0) 
    var c:Long = mp.getOrElse(str, 0) 
    c = c + 1 
    mp = mp + (str -> c) 
    buffer(0) = mp 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = { 
    var mp1 = buffer1.getAs[Map[String, Long]](0) 
    var mp2 = buffer2.getAs[Map[String, Long]](0) 
    mp2 foreach { 
     case (k ,v) => { 
      var c:Long = mp1.getOrElse(k, 0) 
      c = c + v 
      mp1 = mp1 + (k -> c) 
     } 
    } 
    buffer1(0) = mp1 
    } 

    def evaluate(buffer: Row): Any = { 
     buffer.getAs[Map[String, LongType]](0) 
    } 
} 
+0

अच्छा पकड़! हम्म, 'स्ट्रैटे टाइप' में 'मैपीटाइप' मामला है। लेकिन 'spark.sql.types' में कोई अन्य परिवर्तनीय नक्शा प्रकार नहीं है, जब तक कि मैं अपना खुद का परिभाषित नहीं करता। –

+0

जैसा मैंने कहा, नहीं - बस एक अपरिवर्तनीय 'मानचित्र' का उपयोग करें। 'mp = mp + (k -> c) 'एक अपरिवर्तनीय' मानचित्र 'पर आपको' mp के समान कार्यक्षमता देता है।एक म्यूटेबल 'मानचित्र' पर रखें (के, सी) ' –

+0

' एमपी = एमपी + (के -> सी) 'काम करता है! मैं स्कैला के लिए नया हूं, मुझे नहीं पता था कि आप इस तरह एक अपरिवर्तनीय डेटाटाइप का उपयोग कर सकते हैं। आपका बहुत बहुत धन्यवाद! –