2015-09-10 3 views
8

मैं स्पार्क में एक पाइथन वर्ग के रूप में एक मॉडल लागू कर रहा हूं, और जब भी मैं एक आरडीडी में कक्षा विधि को मैप करने का प्रयास करता हूं तो यह विफल हो जाता है। मेरे वास्तविक कोड और अधिक जटिल है, लेकिन इस सरलीकृत संस्करण समस्या के दिल में हो जाता है: अगर मैं (उदाहरण के लिए) तो जैसे मॉडल चलाने के लिए,एक पाइथन कक्षा का उपयोग कर आरडीडी को कैसे संसाधित करें?

class model(object): 
    def __init__(self): 
     self.data = sc.textFile('path/to/data.csv') 
     # other misc setup 
    def run_model(self): 
     self.data = self.data.map(self.transformation_function) 
    def transformation_function(self,row): 
     row = row.split(',') 
     return row[0]+row[1] 

अब:

test = model() 
test.run_model() 
test.data.take(10) 

मैं निम्नलिखित त्रुटि:

अपवाद: ऐसा प्रतीत होता है कि आप प्रसारण चर, क्रिया, या ट्रांसफोर्मेशन से स्पार्ककॉन्टेक्स्ट को संदर्भित करने का प्रयास कर रहे हैं। स्पार्ककॉन्टेक्स्ट का उपयोग केवल ड्राइवर पर किया जा सकता है, कोड में नहीं, जो कि यह श्रमिकों पर चलता है। अधिक जानकारी के लिए, स्पार्क -5063 देखें।

मैंने इसे थोड़ा सा खेला है, और जब भी मैं क्लास के भीतर आरडीडी में क्लास विधि को मैप करने का प्रयास करता हूं तो यह विश्वसनीय रूप से होता है। मैंने पुष्टि की है कि मैप किए गए फ़ंक्शन ठीक काम करता है अगर मैं कक्षा संरचना के बाहर कार्यान्वित करता हूं, इसलिए समस्या को निश्चित रूप से कक्षा के साथ करना है। क्या इसका समाधान करने का कोई तरीका है?

उत्तर

10

समस्या यहां नेस्टेड आरडीडी का उपयोग करने या ट्रांसफॉर्मेशन के अंदर स्पार्क क्रियाओं का उपयोग करने से थोड़ा अधिक सूक्ष्म है। स्पार्क कार्रवाई या परिवर्तन के अंदर SparkContext तक पहुंच की अनुमति नहीं देता है।

यहां तक ​​कि आप इसे स्पष्ट रूप से एक्सेस नहीं करते हैं, इसे बंद करने के अंदर संदर्भित किया जाता है और उसे क्रमबद्ध किया जाना चाहिए और आसपास ले जाना चाहिए। इसका मतलब है कि आपकी transformation विधि, जो self का संदर्भ देती है, SparkContext भी रखती है, इसलिए त्रुटि।

:

आप उदाहरण चर का उपयोग करने में सक्षम होना चाहते हैं तो आप कुछ इस तरह की कोशिश कर सकते हैं:

class model(object): 
    @staticmethod 
    def transformation_function(row): 
     row = row.split(',') 
     return row[0]+row[1] 

    def __init__(self): 
     self.data = sc.textFile('some.csv') 

    def run_model(self): 
     self.data = self.data.map(model.transformation_function) 

संपादित:

एक तरीका यह संभाल करने के लिए स्थिर विधि का उपयोग करने के लिए है

class model(object): 
    @staticmethod 
    def transformation_function(a_model): 
     delim = a_model.delim 
     def _transformation_function(row): 
      return row.split(delim) 
     return _transformation_function 

    def __init__(self): 
     self.delim = ',' 
     self.data = sc.textFile('some.csv') 

    def run_model(self): 
     self.data = self.data.map(model.transformation_function(self)) 
+0

बिल्कुल सही - मैंने एक स्थिर विधि का उपयोग करने के बारे में नहीं सोचा था। केवल समस्या पूर्ण कोड में है, मेरे ट्रांसफॉर्म फ़ंक्शन को 'मॉडल' श्रेणी (आरडीडी नहीं) में अन्य चरों तक पहुंचने की आवश्यकता है। मैं यह समझने का एकमात्र तरीका मान रहा हूं कि उन्हें स्थिर विधि के लिए तर्क के रूप में पास किया जाए? जैसे 'def transformation_function (पंक्ति, somevar): वापसी पंक्ति + somevar' – moustachio

+0

दूसरे शब्दों में - क्या एक स्थिर विधि के भीतर से कक्षा चर ('self.whatever') तक पहुंचने का कोई तरीका है? – moustachio

+0

(ध्यान दें कि ये स्थैतिक चर नहीं हो सकते हैं - मैं निश्चित रूप से स्थिर विधि के भीतर से आवृत्ति चर का उपयोग करना चाहता हूं) – moustachio

संबंधित मुद्दे