में उपयोगकर्ता परिभाषित एग्रेगेट फ़ंक्शन (यूडीएफ़) में म्यूटेबल नक्शा स्वचालित रूप से अपरिवर्तनीय क्यों हो जाता है, मैं स्पार्क में उपयोगकर्ता परिभाषित एग्रेगेट फ़ंक्शन (यूडीएफ़) को परिभाषित करने की कोशिश कर रहा हूं, जो समूह के कॉलम में प्रत्येक अद्वितीय मानों के लिए घटनाओं की संख्या की गणना करता है।स्पार्क
यह एक उदाहरण है: मान लीजिए मैं एक dataframe df
इस तरह है,
+----+----+
|col1|col2|
+----+----+
| a| a1|
| a| a1|
| a| a2|
| b| b1|
| b| b2|
| b| b3|
| b| b1|
| b| b1|
+----+----+
मैं एक UDAF DistinctValues होगा
val func = new DistinctValues
तब मैं
df dataframe पर लागूval agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV"))
मुझे कुछ पसंद होने की उम्मीद है इस ई:
+----+--------------------------+
|col1|DV |
+----+--------------------------+
| a| Map(a1->2, a2->1) |
| b| Map(b1->3, b2->1, b3->1)|
+----+--------------------------+
तो मैं इस तरह एक UDAF के साथ बाहर आया,
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.types.LongType
import Array._
class DistinctValues extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil)
def dataType: DataType = MapType(StringType, LongType)
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = scala.collection.mutable.Map()
}
def update(buffer: MutableAggregationBuffer, input: Row) : Unit = {
val str = input.getAs[String](0)
var mp = buffer.getAs[scala.collection.mutable.Map[String, Long]](0)
var c:Long = mp.getOrElse(str, 0)
c = c + 1
mp.put(str, c)
buffer(0) = mp
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = {
var mp1 = buffer1.getAs[scala.collection.mutable.Map[String, Long]](0)
var mp2 = buffer2.getAs[scala.collection.mutable.Map[String, Long]](0)
mp2 foreach {
case (k ,v) => {
var c:Long = mp1.getOrElse(k, 0)
c = c + v
mp1.put(k ,c)
}
}
buffer1(0) = mp1
}
def evaluate(buffer: Row): Any = {
buffer.getAs[scala.collection.mutable.Map[String, LongType]](0)
}
}
तब मैं अपने dataframe पर इस समारोह है,
val func = new DistinctValues
val agg_values = df.groupBy("col1").agg(func(col("col2")).as("DV"))
यह त्रुटि दे दी है,
func: DistinctValues = [email protected]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 884, ip-172-31-22-166.ec2.internal): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map
at $iwC$$iwC$DistinctValues.update(<console>:39)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:431)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:187)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:180)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:116)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
ऐसा लगता है कि update(buffer: MutableAggregationBuffer, input: Row)
विधि, चर buffer
एक immutable.Map
है, कार्यक्रम थक mutable.Map
करने के लिए इसे कास्ट करने के लिए,
लेकिन मैं mutable.Map
इस्तेमाल किया initialize(buffer: MutableAggregationBuffer, input:Row)
विधि में buffer
चर प्रारंभ करने में। क्या यह वही चर update
विधि पर पारित किया गया है? और buffer
mutableAggregationBuffer
है, तो यह उत्परिवर्तनीय होना चाहिए, है ना?
मेरा mutable.Map क्यों अपरिवर्तनीय हो गया? क्या कोई जानता है कि क्या हुआ?
मुझे कार्य को पूरा करने के लिए वास्तव में इस फ़ंक्शन में एक परिवर्तनीय मानचित्र की आवश्यकता है। मुझे पता है कि अपरिवर्तनीय मानचित्र से एक परिवर्तनीय मानचित्र बनाने के लिए एक कामकाज है, फिर इसे अपडेट करें। लेकिन मैं वास्तव में जानना चाहता हूं कि उत्परिवर्तनीय व्यक्ति प्रोग्राम में स्वचालित रूप से अपरिवर्तनीय क्यों बदलता है, यह मुझे समझ में नहीं आता है।
अच्छा पकड़! हम्म, 'स्ट्रैटे टाइप' में 'मैपीटाइप' मामला है। लेकिन 'spark.sql.types' में कोई अन्य परिवर्तनीय नक्शा प्रकार नहीं है, जब तक कि मैं अपना खुद का परिभाषित नहीं करता। –
जैसा मैंने कहा, नहीं - बस एक अपरिवर्तनीय 'मानचित्र' का उपयोग करें। 'mp = mp + (k -> c) 'एक अपरिवर्तनीय' मानचित्र 'पर आपको' mp के समान कार्यक्षमता देता है।एक म्यूटेबल 'मानचित्र' पर रखें (के, सी) ' –
' एमपी = एमपी + (के -> सी) 'काम करता है! मैं स्कैला के लिए नया हूं, मुझे नहीं पता था कि आप इस तरह एक अपरिवर्तनीय डेटाटाइप का उपयोग कर सकते हैं। आपका बहुत बहुत धन्यवाद! –