2012-01-03 7 views
15

मैं मल्टीप्रोसेसिंग का उपयोग कर एक अनुप्रयोग को समानांतर करने की कोशिश कर रहा हूं जो में एक बहुत बड़ी सीएसवी फ़ाइल (64 एमबी से 500 एमबी) लेता है, कुछ लाइन लाइन लाइन से करता है, और उसके बाद एक छोटा, निश्चित आकार फ़ाइल आउटपुट करता है।मल्टीप्रोसेसिंग के लिए बड़ी फ़ाइल से डेटा को चंकना?

वर्तमान में मैं एक list(file_obj), जो दुर्भाग्य से स्मृति में पूरी तरह से भरी हुई है करना (मुझे लगता है कि) और मैं तो मैं n भागों में ऊपर उस सूची को तोड़ने, एन प्रक्रियाओं मैं चलाना चाहते हैं संख्या है। मैं तो pool.map() टूटा हुआ सूचियों पर करता हूं।

ऐसा लगता है कि एक थ्रेड, बस-खुली-फ़ाइल-और-इटेटेट-ओवर-इट पद्धति की तुलना में वास्तव में, वास्तव में खराब रनटाइम लगता है। क्या कोई बेहतर समाधान सुझा सकता है?

इसके अतिरिक्त, मुझे समूह में पंक्तियों की पंक्तियों को संसाधित करने की आवश्यकता है जो को किसी निश्चित कॉलम के मान को संरक्षित करते हैं। पंक्तियों के ये समूह स्वयं को विभाजित कर सकते हैं, लेकिन इस समूह के लिए किसी समूह में एक से अधिक मान नहीं होने चाहिए।

उत्तर

14

list(file_obj)fileobj बड़ा होने पर बहुत मेमोरी की आवश्यकता हो सकती है। हम लाइनों के हिस्सों को खींचने के लिए itertools का उपयोग कर उस स्मृति आवश्यकता को कम कर सकते हैं क्योंकि हमें उनकी आवश्यकता है।

विशेष रूप से, हम

reader = csv.reader(f) 
chunks = itertools.groupby(reader, keyfunc) 

का उपयोग processable टुकड़ों में विभाजित करने के लिए फ़ाइल कर सकते हैं और

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] 
result = pool.map(worker, groups) 

एक समय में num_chunks मात्रा पर बहु ​​पूल काम है करने के लिए।

ऐसा करके, हमें पूरी फ़ाइल के बजाय स्मृति में कुछ (num_chunks) भाग रखने के लिए लगभग पर्याप्त स्मृति की आवश्यकता है।


import multiprocessing as mp 
import itertools 
import time 
import csv 

def worker(chunk): 
    # `chunk` will be a list of CSV rows all with the same name column 
    # replace this with your real computation 
    # print(chunk) 
    return len(chunk) 

def keyfunc(row): 
    # `row` is one row of the CSV file. 
    # replace this with the name column. 
    return row[0] 

def main(): 
    pool = mp.Pool() 
    largefile = 'test.dat' 
    num_chunks = 10 
    results = [] 
    with open(largefile) as f: 
     reader = csv.reader(f) 
     chunks = itertools.groupby(reader, keyfunc) 
     while True: 
      # make a list of num_chunks chunks 
      groups = [list(chunk) for key, chunk in 
         itertools.islice(chunks, num_chunks)] 
      if groups: 
       result = pool.map(worker, groups) 
       results.extend(result) 
      else: 
       break 
    pool.close() 
    pool.join() 
    print(results) 

if __name__ == '__main__': 
    main() 
+0

मैं झूठ बोला था जब मैंने कहा कि लाइनों अंतर्संबंध नहीं कर रहे हैं - csv में, वहाँ एक स्तंभ जो (एक नाम स्तंभ के आधार पर विभाजित किए जाने की जरूरत है, और उस नाम के साथ सभी पंक्तियों नहीं कर सकते विभाजित हो)। हालांकि, मुझे लगता है कि मैं इसे इस मानदंड पर समूह में अनुकूलित कर सकता हूं। धन्यवाद! मैं itertools के बारे में कुछ नहीं जानता था, और अब मैं कुछ भी नहीं से थोड़ा अधिक। – user1040625

+0

मेरे मूल कोड में एक त्रुटि हुई। 'Pool.apply_async' पर सभी कॉल गैर-अवरुद्ध हैं, इसलिए पूरी फ़ाइल को एक बार में कतारबद्ध किया जा रहा था। इसके परिणामस्वरूप कोई स्मृति बचत नहीं हुई होगी। तो मैंने एक समय में 'num_chunks' को कतार में लूप को थोड़ा सा बदल दिया है। 'Pool.map' पर कॉल अवरुद्ध है, जो पूरी फ़ाइल को एक बार में कतारबद्ध होने से रोक देगा। – unutbu

+0

@HappyLeapSecond एक उपयोगकर्ता यहां आपकी विधियों को लागू करने का प्रयास कर रहा है http://stackoverflow.com/questions/31164731/python-chunking-csv-file-multiproccessing और परेशानी हो रही है। शायद आप मदद कर सकते हैं? – m0meni

1

मैं इसे सरल रखूंगा। एक प्रोग्राम को फ़ाइल खोलें और लाइन से लाइन को पढ़ें। आप इसे चुनने के लिए कितनी फाइलें चुन सकते हैं, कई आउटपुट फाइलें खोलें, और प्रत्येक पंक्ति अगली फाइल पर लिखें। यह फ़ाइल को n बराबर भागों में विभाजित करेगा। फिर आप समांतर में प्रत्येक फाइल के खिलाफ एक पायथन प्रोग्राम चला सकते हैं।

संबंधित मुद्दे