2013-03-29 10 views
6

का उपयोग करके एचडीएफ फ़ाइल में डेटा लिखें यह एक साधारण समस्या की तरह लगता है लेकिन मैं इसके चारों ओर अपना सिर नहीं प्राप्त कर सकता।मल्टीप्रोसेसिंग

मेरे पास एक सिमुलेशन है जो लूप के लिए डबल में चलता है और परिणाम एचडीएफ फ़ाइल में लिखता है। इस कार्यक्रम का एक सरल संस्करण नीचे दिखाया गया है:

import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    for ii in a: 
     print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 
Simulation() 

इस कोड है कि वास्तव में क्या मैं चाहता हूँ लेकिन जब से प्रक्रिया को चलाने के लिए काफी समय लग सकता है मैं बहु मॉड्यूल का उपयोग करता है और निम्नलिखित कोड का उपयोग करने की कोशिश की:

import multiprocessing 
import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(ii): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 

if __name__ == '__main__': 
    jobs = [] 
    for ii in a: 
     p = multiprocessing.Process(target=Simulation, args=(ii,)) 
     jobs.append(p)  
     p.start() 

हालांकि यह केवल एचडीएफ फ़ाइल के अंतिम अनुकरण को प्रिंट करता है, किसी भी तरह यह अन्य सभी समूहों को ओवरराइट करता है।

उत्तर

10

प्रत्येक बार जब आप लिखने में फ़ाइल खोलते हैं (w) मोड, एक नई फ़ाइल बनाई जाती है - इसलिए फ़ाइल की सामग्री खो जाती है यदि यह पहले से मौजूद है। केवल अंतिम फ़ाइल हैंडल फ़ाइल को सफलतापूर्वक लिख सकता है। यहां तक ​​कि यदि आपने मोड को जोड़ने के लिए इसे बदल दिया है, तो आपको एक ही फाइल को एकाधिक प्रक्रियाओं से लिखने की कोशिश नहीं करनी चाहिए - यदि दो प्रक्रियाएं एक ही समय में लिखने की कोशिश करती हैं तो आउटपुट खराब हो जाएगा।

इसके बजाय, सभी कार्यकर्ता एक कतार में उत्पादन डाल प्रक्रियाओं है, और (या तो एक उपप्रक्रिया या मुख्य प्रक्रिया) एक एकल समर्पित प्रक्रिया है कतार से उत्पादन को संभालने और फ़ाइल पर लिखने:


import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 


def Simulation(ii): 
    result = [] 
    result.append(('createGroup', ('/', 'A%s' % ii))) 
    for i in range(num_arrays): 
     result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 
    return result 


def handle_output(result): 
    hdf = pt.openFile('simulation.h5', mode='a') 
    for args in result: 
     method, args = args 
     getattr(hdf, method)(*args) 
    hdf.close() 


if __name__ == '__main__': 
    # clear the file 
    hdf = pt.openFile('simulation.h5', mode='w') 
    hdf.close() 
    pool = mp.Pool(num_processes) 
    for i in range(num_simulations): 
     pool.apply_async(Simulation, (i,), callback=handle_output) 
    pool.close() 
    pool.join() 
:
import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 
sentinel = None 


def Simulation(inqueue, output): 
    for ii in iter(inqueue.get, sentinel): 
     output.put(('createGroup', ('/', 'A%s' % ii))) 
     for i in range(num_arrays): 
      output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 


def handle_output(output): 
    hdf = pt.openFile('simulation.h5', mode='w') 
    while True: 
     args = output.get() 
     if args: 
      method, args = args 
      getattr(hdf, method)(*args) 
     else: 
      break 
    hdf.close() 

if __name__ == '__main__': 
    output = mp.Queue() 
    inqueue = mp.Queue() 
    jobs = [] 
    proc = mp.Process(target=handle_output, args=(output,)) 
    proc.start() 
    for i in range(num_processes): 
     p = mp.Process(target=Simulation, args=(inqueue, output)) 
     jobs.append(p) 
     p.start() 
    for i in range(num_simulations): 
     inqueue.put(i) 
    for i in range(num_processes): 
     # Send the sentinal to tell Simulation to end 
     inqueue.put(sentinel) 
    for p in jobs: 
     p.join() 
    output.put(None) 
    proc.join() 

तुलना के लिए, यहाँ एक संस्करण है जो mp.Pool का उपयोग करता है

यह आसान लगता है ना? हालांकि एक संकेतक अंतर है। handle_output पर तर्क भेजने के लिए मूल कोड output.put का उपयोग किया गया था जो अपने स्वयं के उपप्रजाति में चल रहा था। handle_outputargsoutput कतार से ले जाएगा और उन्हें तुरंत संभालेंगे। ऊपर पूल कोड के साथ, Simulationargsresult और result में handle_output पर Simulation रिटर्न के बाद तक args का पूरा समूह जमा करता है।

यदि Simulation में काफी समय लगता है, तो एक लंबी प्रतीक्षा अवधि होगी जबकि simulation.h5 पर कुछ भी नहीं लिखा जा रहा है।

+0

इस प्रश्न के अतिरिक्त मैंने सफलता के साथ उपरोक्त कोड का उपयोग किया है, लेकिन अब मैं इस अनुकरण का विस्तार कर रहा हूं, एक लम्बाई (1000) द्वारा परिभाषित लूप के लिए और बी = रेंज (100) द्वारा परिभाषित लूप के लिए भी। इस स्मृति के परिणामस्वरूप मेरी याददाश्त का व्यापक उपयोग होता है। मेरे पास 16 जीबी रैम के साथ 8 सीपीयू है लेकिन जब मैं फ़ाइल चलाता हूं (वास्तविक सिमुलेशन के बिना भी) मेरा राम उपयोग 100% तक जाता है जिसके परिणामस्वरूप मेरा सिस्टम रुक जाता है। – user2143958

+0

मुझे लगता है कि हमें कार्यों की संख्या से उपप्रोसेस की संख्या को अलग करने की आवश्यकता है। ऐसा लगता है कि आप 1000 कार्य चाहते हैं, लेकिन शायद 1000 उपप्रमुख नहीं हैं। मैं इस पोस्ट को संपादित करने के तरीके को संपादित करने के लिए संपादित करूंगा। – unutbu

+0

हां आप सही हैं, पिछले पुनरावृत्तियों के लिए पिछले उदाहरण में समान स्मृति की समान मात्रा में सभी स्मृति को छिपाने के लिए बनाया गया था। आपके द्वारा संपादित की गई फ़ाइल सही काम करती है! लेकिन सिर्फ स्पष्टीकरण के लिए, मैं पूल() फ़ंक्शन के साथ भी प्रयोग कर रहा था और यह फ़ंक्शन भी काफी अच्छा काम करता प्रतीत होता है, हालांकि एक से अधिक परिवर्तनीय पारित होने पर यह कठिन हो जाता है। पूल() फ़ंक्शन पर प्रक्रिया() फ़ंक्शन चुनने का मुख्य कारण क्या है? – user2143958

संबंधित मुद्दे