2015-06-29 23 views
9

के माध्यम से आगे बढ़ना और विश्लेषिकी प्रसंस्करण के लिए वेक्टर मैट्रिक्स बनाने के लिए स्पार्क डेटाफ्रेम से शुरू करना।स्पार्क आरडीडी

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache() 
feature_matrix_vectors.first() 

आउटपुट वैक्टरों की एक सरणी है। उन वेक्टर से कुछ नहीं तो एक 1.

साथ, उन में एक अशक्त

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]) 
... 
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null]) 

इस मैं वेक्टर मैट्रिक्स के माध्यम से पुनरावृति और 0 (शून्य) वेक्टर एक अशक्त है या नहीं के साथ एक LabeledPoint सरणी बनाना चाहते हैं से है

def f(row): 
    if row.contain(None): 
     LabeledPoint(1.0,row) 
    else: 
     LabeledPoint(0.0,row) 

मैं

feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) # create a generator of row sums 
next(feature_matrix_labeledPoint) # Run the iteration protocol 

प्रयोग करके वेक्टर मैट्रिक्स के माध्यम से पुनरावृति करने की कोशिश की है, लेकिन यह काम नहीं करता।

TypeError: 'PipelinedRDD' object is not iterable 

किसी भी मदद की बहुत अच्छा होगा

+0

यह तो जवाब यह विवरण होता है http://stackoverflow.com/a/25296061/429476 –

उत्तर

7

RDDs एक अजगर सूचियों के लिए प्रतिस्थापन में एक बूंद नहीं हैं। आपको किसी भी क्रिया या परिवर्तन का उपयोग करना होगा जो किसी दिए गए RDD पर उपलब्ध है। यहाँ आप बस map उपयोग कर सकते हैं:

from pyspark.mllib.linalg import DenseVector 
from pyspark.mllib.regression import LabeledPoint 


feature_matrix_vectors = sc.parallelize([ 
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]), 
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None]) 
]) 

(feature_matrix_vectors 
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v)) 
    .collect()) 
संबंधित मुद्दे