2015-09-16 11 views
10

में अगर वहाँ pyspark में एक DataFrame पर एमएल (उदाहरण, KMeans) चलाने के लिए एक संक्षिप्त तरीके से है अगर मैं कई सांख्यिक स्तंभ में विशेषताएं हैं मैं सोच रहा हूँ।फीचर वेक्टर प्रोग्राम के रूप में बनाने के लिए स्पार्क एमएल/pyspark

आईई। Iris डेटासेट में के रूप में:

(a1=5.1, a2=3.5, a3=1.4, a4=0.2, id=u'id_1', label=u'Iris-setosa', binomial_label=1) 

मैं डेटासेट पुनः बनाने के साथ फीचर वेक्टर एक नया स्तंभ और मूल स्तंभों कोड में बार-बार hardcoded के रूप में मैन्युअल रूप से जोड़ा बिना KMeans उपयोग करना चाहते हैं।

समाधान मैं बेहतर करना चाहते हैं:

from pyspark.mllib.linalg import Vectors 
from pyspark.sql.types import Row 
from pyspark.ml.clustering import KMeans, KMeansModel 

iris = sqlContext.read.parquet("/opt/data/iris.parquet") 
iris.first() 
# Row(a1=5.1, a2=3.5, a3=1.4, a4=0.2, id=u'id_1', label=u'Iris-setosa', binomial_label=1) 

df = iris.map(lambda r: Row(
        id = r.id, 
        a1 = r.a1, 
        a2 = r.a2, 
        a3 = r.a3, 
        a4 = r.a4, 
        label = r.label, 
        binomial_label=r.binomial_label, 
        features = Vectors.dense(r.a1, r.a2, r.a3, r.a4)) 
        ).toDF() 


kmeans_estimator = KMeans()\ 
    .setFeaturesCol("features")\ 
    .setPredictionCol("prediction")\ 
kmeans_transformer = kmeans_estimator.fit(df) 

predicted_df = kmeans_transformer.transform(df).drop("features") 
predicted_df.first() 
# Row(a1=5.1, a2=3.5, a3=1.4, a4=0.2, binomial_label=1, id=u'id_1', label=u'Iris-setosa', prediction=1) 

मैं एक समाधान है, जो कुछ तरह की तलाश में हूँ:

feature_cols = ["a1", "a2", "a3", "a4"] 
prediction_col_name = "prediction" 
<dataframe independent code for KMeans> 
<New dataframe is created, extended with the `prediction` column.> 

उत्तर

20

आप VectorAssembler उपयोग कर सकते हैं:

from pyspark.ml.feature import VectorAssembler 

ignore = ['id', 'label', 'binomial_label'] 
assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x not in ignore], 
    outputCol='features') 

assembler.transform(df) 

यह एमएल पाइपलाइन का उपयोग कर k-साधनों के साथ जोड़ा जा सकता है:

from pyspark.ml import Pipeline 

pipeline = Pipeline(stages=[assembler, kmeans_estimator]) 
model = pipeline.fit(df) 
संबंधित मुद्दे