2015-11-24 8 views
12

मेरे पास Int और Vec प्रकार Vector (org.apache.spark.mllib.linalg.Vector) के दो कॉलम का डेटाफ़्रेम है, ID और Vecवेक्टरों के कॉलम को योग करने के लिए कस्टम एकत्रीकरण फ़ंक्शन को कैसे परिभाषित किया जाए?

DataFrame पालन की तरह दिखता है:

ID,Vec 
1,[0,0,5] 
1,[4,0,1] 
1,[1,2,1] 
2,[7,5,0] 
2,[3,3,4] 
3,[0,8,1] 
3,[0,0,1] 
3,[7,7,7] 
.... 

मैं करने के लिए एक groupBy($"ID") तो वैक्टर संक्षेप द्वारा प्रत्येक समूह के अंदर पंक्तियों पर एक एकत्रीकरण लागू करना चाहते हैं।

ऊपर के उदाहरण के वांछित उत्पादन होगा:

ID,SumOfVectors 
1,[5,2,7] 
2,[10,8,4] 
3,[7,15,9] 
... 

उपलब्ध एकत्रीकरण कार्यों से काम नहीं चलेगा, उदा df.groupBy($"ID").agg(sum($"Vec") क्लासकास्ट अपवाद का नेतृत्व करेगा।

कस्टम एकत्रीकरण फ़ंक्शन को कैसे कार्यान्वित करें जो मुझे वैक्टर या सरणी या किसी अन्य कस्टम ऑपरेशन की राशि करने की अनुमति देता है?

+3

संभावित डुप्लिकेट [मैं स्पार्क एसक्यूएल में उपयोगकर्ता-परिभाषित कुल फ़ंक्शन को कैसे परिभाषित और उपयोग कर सकता हूं?] (Http://stackoverflow.com/questions/32100973/how-can-i-define-and-use-a-user -डिफाइन-एग्रीगेट-फ़ंक्शन-इन-स्पार्क-एसक्यूएल) –

उत्तर

19

व्यक्तिगत रूप से मैं यूडीएफ़ से परेशान नहीं होगा। वर्बोज़ से अधिक हैं और बिल्कुल तेज़ नहीं हैं। इसके बजाय मैं बस reduceByKey/foldByKey का प्रयोग करेंगे:

import org.apache.spark.sql.Row 
import breeze.linalg.{DenseVector => BDV} 
import org.apache.spark.ml.linalg.{Vector, Vectors} 

val rdd = sc.parallelize(Seq(
    (1, "[0,0,5]"), (1, "[4,0,1]"), (1, "[1,2,1]"), 
    (2, "[7,5,0]"), (2, "[3,3,4]"), (3, "[0,8,1]"), 
    (3, "[0,0,1]"), (3, "[7,7,7]"))) 

val df = rdd.map{case (k, v) => (k, Vectors.parse(v))}.toDF("id", "vec") 

val aggregated = df 
    .rdd 
    .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } 
    .foldByKey(BDV(Array.fill(3)(0.0)))(_ += _) 
    .mapValues(v => Vectors.dense(v.toArray)) 
    .toDF("id", "vec") 

aggregated.show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

और बस तुलना के लिए एक "सरल" UDAF। आवश्यक आयात:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
    UserDefinedAggregateFunction} 
import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} 
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} 
import org.apache.spark.sql.Row 
import scala.collection.mutable.WrappedArray 

कक्षा परिभाषा:

class VectorSum (n: Int) extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) 
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) 
    def dataType = SQLDataTypes.VectorType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, Array.fill(n)(0.0)) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) { 
     val buff = buffer.getAs[WrappedArray[Double]](0) 
     val v = input.getAs[Vector](0).toSparse 
     for (i <- v.indices) { 
      buff(i) += v(i) 
     } 
     buffer.update(0, buff) 
     } 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
     val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
     for ((x, i) <- buff2.zipWithIndex) { 
     buff1(i) += x 
     } 
     buffer1.update(0, buff1) 
    } 

    def evaluate(buffer: Row) = Vectors.dense(
     buffer.getAs[Seq[Double]](0).toArray) 
} 

और एक उदाहरण के उपयोग:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

यह भी देखें: How to find mean of grouped Vector columns in Spark SQL?

+0

मुझे लगता है कि चाल हवा का उपयोग कर रही है। linalg.DensVector, यह क्यों काम कर रहा है और mllib.linalg के घने वेक्टर नहीं? – Rami

+1

समस्या है 'mllib.linalg.Vector' के स्कैला संस्करण के लिए कोई '+' विधि नहीं है। – zero323

+0

यह डीएफ या एसक्यूएल के साथ नहीं किया जा सकता है? U12F के अलावा – oluies

0

मेरा सुझाव है निम्नलिखित (स्पार्क 2.0.2 आगे पर काम करता है), यह अनुकूलित किया जा सकता है, लेकिन यह बहुत अच्छा है, एक बात आप पहले से पता करने के लिए है वेक्टर आकार होता है जब आप UDAF उदाहरण बनाने

import org.apache.spark.ml.linalg._ 
import org.apache.spark.mllib.linalg.WeightedSparseVector 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class VectorAggregate(val numFeatures: Int) 
    extends UserDefinedAggregateFunction { 

private type B = Map[Int, Double] 

def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil) 

def bufferSchema: StructType = 
StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil) 

def initialize(buffer: MutableAggregationBuffer): Unit = 
buffer.update(0, Map.empty[Int, Double]) 

def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val zero = buffer.getAs[B](0) 
    input match { 
     case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) 
     case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }} 
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
val zero = buffer1.getAs[B](0) 
buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})} 

def deterministic: Boolean = true 

def evaluate(buffer: Row): Any = { 
    val Row(agg: B) = buffer 
    val indices = agg.keys.toArray.sorted 
    Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed 
} 

def dataType: DataType = new VectorUDT() 
} 
संबंधित मुद्दे

 संबंधित मुद्दे