5

पर आकलनकर्ता मैं एक DataFrame कि अनुवर्ती की तरह लग रहा है:स्पार्क, DataFrame: लागू ट्रांसफार्मर/समूहों

+-----------+-----+------------+ 
|  userID|group| features| 
+-----------+-----+------------+ 
|12462563356| 1| [5.0,43.0]| 
|12462563701| 2| [1.0,8.0]| 
|12462563701| 1| [2.0,12.0]| 
|12462564356| 1| [1.0,1.0]| 
|12462565487| 3| [2.0,3.0]| 
|12462565698| 2| [1.0,1.0]| 
|12462565698| 1| [1.0,1.0]| 
|12462566081| 2| [1.0,2.0]| 
|12462566081| 1| [1.0,15.0]| 
|12462566225| 2| [1.0,1.0]| 
|12462566225| 1| [9.0,85.0]| 
|12462566526| 2| [1.0,1.0]| 
|12462566526| 1| [3.0,79.0]| 
|12462567006| 2| [11.0,15.0]| 
|12462567006| 1| [10.0,15.0]| 
|12462567006| 3| [10.0,15.0]| 
|12462586595| 2| [2.0,42.0]| 
|12462586595| 3| [2.0,16.0]| 
|12462589343| 3| [1.0,1.0]| 
+-----------+-----+------------+ 

कहाँ कॉलम प्रकार हैं: userID: लंबे, समूह: इंट, और विशेषताएं: वेक्टर।

यह पहले से ही समूहीकृत डेटाफ्रेम है, यानी उपयोगकर्ता आईडी अधिकतम समय पर किसी विशेष समूह में दिखाई देगी।

मेरा लक्ष्य प्रति समूह features कॉलम को स्केल करना है।

वहाँ लागू करने के लिए एक रास्ता है एक feature transformer पूर्ण DataFrame करने के लिए इसे लागू करने के बजाय प्रति समूह(मेरे मामले में मैं एक StandardScaler आवेदन करना चाहते हैं)।

पीएस एमएल का उपयोग अनिवार्य नहीं है, इसलिए समाधान कोई समस्या नहीं है यदि समाधान एमएलआईबीबी पर आधारित है।

+0

आप मानक स्केलर को फिट करने की योजना कैसे बना रहे हैं? प्रत्येक समूह पर? – eliasah

+0

मैं प्रति समूह फीचर वेक्टर के प्रत्येक आयाम को स्केल करना चाहता हूं। – Rami

+1

AFAIK यह नहीं करता है लेकिन आप हमेशा सभी परिचालनों को सीधे लागू कर सकते हैं। स्केलर आरडीडी पर वैसे भी काम करता है, इसलिए यह आंकड़ों की गणना करने और प्रति समूह को बदलने की बात है। – zero323

उत्तर

5

आप डिफ़ॉल्ट Scaler के रूप में लगभग एक ही कोड का उपयोग समूह के आधार पर आंकड़े की गणना कर सकते हैं:

import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.sql.Row 

// Compute Multivariate Statistics 
val summary = data.select($"group", $"features") 
    .rdd 
    .map { 
     case Row(group: Int, features: Vector) => (group, features) 
    } 
    .aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */ 
     (agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */ 
     (agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */ 
    .mapValues { 
     s => (
     s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */ 
     s.mean.toArray /* fetch the mean for each key */ 
    ) 
    }.collectAsMap 

समूहों की अपेक्षित संख्या अपेक्षाकृत कम आप इन प्रसारित कर सकते हैं यह है:

val summaryBd = sc.broadcast(summary) 

और अपने डेटा को बदलने :

val scaledRows = df.map{ case Row(userID, group: Int, features: Vector) => 
    val (stdev, mean) = summaryBd.value(group) 
    val vs = features.toArray.clone() 
    for (i <- 0 until vs.size) { 
    vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1/stdev(i)) 
    } 
    Row(userID, group, Vectors.dense(vs)) 
} 
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema) 

अन्यथा आप बस शामिल हो सकते हैं। समूह स्तंभ के साथ एक एमएम ट्रांसफॉर्मर के रूप में इसे एक पैरा के रूप में लपेटना मुश्किल नहीं होना चाहिए।

+1

यह एक उत्कृष्ट उत्तर है! – eliasah

संबंधित मुद्दे