MRJob

2012-02-15 3 views
6

के साथ एकाधिक इनपुट मैपरेडस, एमआरजेब के लिए येल्प के पायथन एपीआई का उपयोग करना सीखने की कोशिश कर रहा हूं। उनका सरल शब्द काउंटर उदाहरण समझ में आता है, लेकिन मुझे उत्सुकता है कि एक व्यक्ति एकाधिक इनपुट से जुड़े एप्लिकेशन को कैसे संभालेगा। उदाहरण के लिए, किसी दस्तावेज़ में शब्दों को गिनने के बजाए, मैट्रिक्स द्वारा वेक्टर को गुणा करना। मैं इस समाधान है, जो काम करता है के साथ आया था, लेकिन मूर्खतापूर्ण लगता है:MRJob

class MatrixVectMultiplyTast(MRJob): 
    def multiply(self,key,line): 
      line = map(float,line.split(" ")) 
      v,col = line[-1],line[:-1] 

      for i in xrange(len(col)): 
        yield i,col[i]*v 

    def sum(self,i,occurrences): 
      yield i,sum(occurrences) 

    def steps(self): 
      return [self.mr (self.multiply,self.sum),] 

if __name__=="__main__": 
    MatrixVectMultiplyTast.run() 

इस कोड ./matrix.py < input.txt चलाया जाता है और कारण यह काम करता है यह है कि मैट्रिक्स, कॉलम द्वारा input.txt में संग्रहीत पर इसी वेक्टर मूल्य के साथ पंक्ति का अंत।

तो, निम्नलिखित मैट्रिक्स और वेक्टर:

enter image description here

रूप input.txt के रूप में प्रतिनिधित्व कर रहे हैं:

enter image description here

संक्षेप में, मैं कैसे मैट्रिक्स भंडारण के बारे में जाने के लिए और होगा वेक्टर अलग-अलग फाइलों में स्वाभाविक रूप से और उन्हें एमआरजेब में पास कर रहे हैं?

उत्तर

3

यदि आप किसी अन्य के खिलाफ अपने कच्चे डेटा प्रसंस्करण की जरूरत होती हैं (या एक ही row_i, row_j) डेटा सेट, या तो आप कर सकते हैं:

1) अपने डेटा की एक प्रतिलिपि स्टोर करने के लिए एक S3 बाल्टी बनाएँ। इस प्रतिलिपि का स्थान अपनी कार्य वर्ग में पास करें, उदा। नीचे दिए गए कोड में self.options.bucket और self.options.my_datafile_copy_location। चेतावनी: दुर्भाग्य से, ऐसा लगता है कि पूरी फाइल को संसाधित होने से पहले कार्य मशीनों को "डाउनलोड" होना चाहिए। यदि कनेक्शन लोड हो जाते हैं या लोड करने में बहुत अधिक समय लगता है, तो यह काम विफल हो सकता है। ऐसा करने के लिए यहां कुछ पायथन/एमआरजेब कोड है।

अपने नक्शाकार समारोह में रखें यह:

d1 = line1.split('\t', 1) 
v1, col1 = d1[0], d1[1] 
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING) 
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip() 
### CAVEAT: Needs to get the whole file before processing the rest. 
for line2 in data_copy.split('\n'): 
    d2 = line2.split('\t', 1) 
    v2, col2 = d2[0], d2[1] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
conn.close() 

2) एक SimpleDB डोमेन बनाएँ, और वहाँ में अपने सभी डेटा के दुकान। पर Boto और SimpleDB यहां पढ़ें: http://code.google.com/p/boto/wiki/SimpleDbIntro

आपका नक्शाकार कोड इस तरह दिखेगा:

dline = dline.strip() 
d0 = dline.split('\t', 1) 
v1, c1 = d0[0], d0[1] 
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME) 
for item in domain: 
    v2, c2 = item.name, item['column'] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
sdb.close() 

यह दूसरा विकल्प बेहतर है, तो आप डेटा के बहुत बड़ी मात्रा में प्रदर्शन कर सकते हैं, क्योंकि यह अनुरोध कर सकता है एक ही समय में पूरी राशि के बजाय डेटा की प्रत्येक पंक्ति के लिए। ध्यान रखें कि SimpleDB मान केवल अधिकतम 1024 वर्ण लंबे हो सकते हैं, इसलिए यदि आपके डेटा मान उस से अधिक लंबे हैं तो आपको कुछ विधि के माध्यम से संपीड़ित/डिकंप्रेस करने की आवश्यकता हो सकती है।

1

मेरी समझ में, आप श्रीजोब का उपयोग नहीं करेंगे जबतक कि आप अमेज़ॅन से हडोप क्लस्टर या हाडोप सेवाओं का लाभ उठाना नहीं चाहते थे, भले ही उदाहरण स्थानीय फाइलों पर चलने का उपयोग करता हो।

जॉब सबमिट करने के लिए प्रमुख उपयोग में श्रीजोब "Hadoop streaming"।

इसका मतलब है कि हडोप से फ़ाइलों या फ़ोल्डर्स के रूप में निर्दिष्ट सभी इनपुट मैपर और बाद के परिणामों को reducer पर स्ट्रीम किया जाता है। सभी मैपर इनपुट का एक टुकड़ा प्राप्त करता है और सभी इनपुट को स्कैमैटिक रूप से समान मानता है ताकि यह समान रूप से पार्स और प्रक्रियाओं को कुंजी कर सके, प्रत्येक डेटा स्लाइस के लिए मूल्य।

इस समझ से प्राप्त करने से, इनपुट स्कीमेटिक रूप से मैपर के समान होते हैं।दो अलग-अलग योजनाबद्ध डेटा को शामिल करने का एकमात्र तरीका यह है कि उन्हें उसी फ़ाइल में इस तरह से अलग करना है कि मैपर समझ सकता है कि वेक्टर डेटा कौन सा है और मैट्रिक्स डेटा कौन सा है।

You are actually doing it already. 

आप बस कुछ विनिर्देशक अगर एक लाइन मैट्रिक्स डेटा या एक वेक्टर डेटा है कि होने से सुधार कर सकते हैं। एक बार जब आप वेक्टर डेटा देखते हैं तो इसके पहले के मैट्रिक्स डेटा को लागू किया जाता है।

matrix, 1, 2, ... 
matrix, 2, 4, ... 
vector, 3, 4, ... 
matrix, 1, 2, ... 
..... 

लेकिन आपने जिस प्रक्रिया का उल्लेख किया है वह अच्छी तरह से काम करता है। आपके पास एक ही फाइल में सभी योजनाबद्ध डेटा होना चाहिए।

हालांकि इसमें अभी भी समस्याएं हैं। के, वी मानचित्र एक काम में पूर्ण स्कीमा मौजूद होने पर बेहतर काम करता है और इसमें एक पूर्ण एकल प्रसंस्करण इकाई होती है।

मेरी समझ से, आप पहले से ही इसे सही तरीके से कर रहे हैं लेकिन मुझे लगता है कि मानचित्र-कमी इस प्रकार के डेटा के लिए उपयुक्त तंत्र नहीं है। मुझे उम्मीद है कि कोई इसे जितना भी कर सकता है उससे आगे भी स्पष्ट करता है।

2

आपके प्रश्न का वास्तविक उत्तर यह है कि mrjob अभी तक हैडूप स्ट्रीमिंग पैटर्न पैटर्न का समर्थन नहीं करता है, जो map_input_file पर्यावरण चर (जो map.input.file प्रॉपर्टी का खुलासा करता है) को पढ़ने के लिए है, यह निर्धारित करने के लिए कि आप किस प्रकार की फ़ाइल अपने पथ और/या नाम के आधार पर काम कर रहे हैं।

आप अभी भी, इसे हटा करने के लिए करता है, तो आप आसानी से बस डेटा अपने आप जो टाइप यह के अंतर्गत आता है पढ़ने से पता लगा सकते हैं सक्षम के रूप में इस लेख में प्रदर्शित किया जाता है हो सकता है:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

हालांकि वह नहीं है हमेशा संभव ...

अन्यथा मेरा जॉब शानदार लग रहा है और मेरी इच्छा है कि वे भविष्य में इसके लिए समर्थन जोड़ सकें। तब तक यह मेरे लिए एक सौदा ब्रेकर है।

1

इस प्रकार मैं एकाधिक इनपुट का उपयोग करता हूं और फ़ाइल नाम के आधार पर मैपर चरण में उपयुक्त परिवर्तन करता हूं।

धावक कार्यक्रम:

from mrjob.hadoop import * 


#Define all arguments 

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S') 
hadoop_bin = '/usr/bin/hadoop' 
mode = 'hadoop' 
hs = HadoopFilesystem([hadoop_bin]) 

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"] 

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin] 
aargs.extend(input_file_names) 
aargs.extend(['-o',output_dir]) 
print aargs 
status_file = True 

mr_job = MRJob(args=aargs) 
with mr_job.make_runner() as runner: 
    runner.run() 
os.environ['HADOOP_HOME'] = '' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 

MRJob कक्षा:

class MR_Job(MRJob): 
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value' 
    def mapper(self, _, line): 
    """ 
    This function reads lines from file. 
    """ 
    try: 
     #Need to clean email. 
     input_file_name = get_jobconf_value('map.input.file').split('/')[-2] 
       """ 
       Mapper code 
       """ 
    except Exception, e: 
     print e 

    def reducer(self, email_id,visitor_id__date_time): 
    try: 
     """ 
       Reducer Code 
       """ 
    except: 
     pass 


if __name__ == '__main__': 
    MRV_Email.run()