2015-09-23 12 views
12

मैं मान (स्ट्रिंग, स्पारसेवेक्टर) के एक टपल साथ एक RDD है वेक्टर के रूप में एक स्तंभ के साथ एक DataFrame करने के लिए एक स्पारसेवेक्टर कॉलम के साथ एक RDD परिवर्तित और मैं RDD का उपयोग कर एक DataFrame बनाना चाहते हैं। एक (लेबल: स्ट्रिंग, विशेषताएं: वेक्टर) प्राप्त करने के लिए डेटाफ्रेम जो अधिकांश एमएल एल्गोरिदम पुस्तकालयों द्वारा आवश्यक स्कीमा है। मुझे पता है कि यह किया जा सकता है क्योंकि HashingTF मिलीलीटर लाइब्रेरी डेटाफ्रेम के फीचर कॉलम दिए जाने पर वेक्टर आउटपुट करता है।मैं कैसे

temp_df = sqlContext.createDataFrame(temp_rdd, StructType([ 
     StructField("label", DoubleType(), False), 
     StructField("tokens", ArrayType(StringType()), False) 
    ])) 

#assumming there is an RDD (double,array(strings)) 

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features") 

ndf = hashingTF.transform(temp_df) 
ndf.printSchema() 

#outputs 
#root 
#|-- label: double (nullable = false) 
#|-- tokens: array (nullable = false) 
#| |-- element: string (containsNull = true) 
#|-- features: vector (nullable = true) 

तो मेरे सवाल का मैं किसी भी तरह का एक RDD (स्ट्रिंग, स्पारसेवेक्टर) यह (स्ट्रिंग, वेक्टर) के एक DataFrame में कन्वर्ट होने कर सकते हैं, है। मैंने सामान्य sqlContext.createDataFrame के साथ प्रयास किया लेकिन मेरे पास DataType कोई आवश्यकता नहीं है जो मेरे पास आवश्यक है।

df = sqlContext.createDataFrame(rdd,StructType([ 
     StructField("label" , StringType(),True), 
     StructField("features" , ?Type(),True) 
    ])) 

उत्तर

17

आप यहाँ VectorUDT उपयोग करने के लिए:

# In Spark 1.x 
# from pyspark.mllib.linalg import SparseVector, VectorUDT 
from pyspark.ml.linalg import SparseVector, VectorUDT 

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 

## root 
## |-- label: double (nullable = true) 
## |-- features: vector (nullable = true) 
बस पूर्णता स्काला समकक्ष के लिए

:

import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.types.{DoubleType, StructType} 
// In Spark 1x. 
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} 
import org.apache.spark.ml.linalg.Vectors 
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType 

val schema = new StructType() 
    .add("label", DoubleType) 
    // In Spark 1.x 
    //.add("features", new VectorUDT()) 
    .add("features",VectorType) 

val temp_rdd: RDD[Row] = sc.parallelize(Seq(
    Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))), 
    Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5)))) 
)) 

spark.createDataFrame(temp_rdd, schema).printSchema 

// root 
// |-- label: double (nullable = true) 
// |-- features: vector (nullable = true) 
+2

वाह, मैं उम्र के दौरान इसकी तलाश कर रहा था! खुशी की लगभग रोना:,) +1 –

+1

यह काम किया! आपका बहुत बहुत धन्यवाद! क्या आप मुझे बता सकते हैं कि दस्तावेज में कहां है? linalg apache स्पार्क पर किसी भी वेक्टरोरटी को नहीं ढूंढ सकता डॉक्स –

+0

@OangelangelMarquez शायद एक पुल अनुरोध आवश्यक है –

4

जबकि @ zero323 का जवाब https://stackoverflow.com/a/32745924/1333621 समझ में आता है, और मैं यह मेरे लिए काम किया इच्छा - डेटाफ्रेम अंतर्निहित rdd, sqlContext.createDataFrame (temp_rdd, स्कीमा), अभी भी निहित स्पैर्सवेक्टर प्रकार मैं निम्नलिखित DenseVector प्रकार कन्वर्ट करने के लिए करना था - मैं जानना चाहता

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 
df_w_ftr = temp_rdd.toDF(schema) 

print 'original convertion method: ',df_w_ftr.take(5) 
print('\n') 
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) 
print type(temp_rdd_dense), type(temp_rdd) 
print 'using map and toArray:', temp_rdd_dense.take(5) 

temp_rdd_dense.toDF().show() 

root 
|-- label: double (nullable = true) 
|-- features: vector (nullable = true) 

original convertion method: [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))] 


<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'> 
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)] 

+------------------+-----+ 
|   features|label| 
+------------------+-----+ 
| [0.0,1.0,0.0,5.5]| 0.0| 
|[-1.0,0.0,0.5,0.0]| 1.0| 
+------------------+-----+ 
1

इस चिंगारी 2,1

import org.apache.spark.ml.linalg.Vector 

def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = { 
    import sparkSession.implicits._ 
    val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x)) 
    val df = rdd.toDF("label","features").select("features") 
    df 
    } 

toDF() के लिए स्केला में एक उदाहरण है चाहते हैं, तो किसी को एक छोटी/बेहतर तरीका है सुविधाओं पर संकलक द्वारा पहचाना नहीं गया था