प्रत्येक बार जब आप लिखने में फ़ाइल खोलते हैं (w
) मोड, एक नई फ़ाइल बनाई जाती है - इसलिए फ़ाइल की सामग्री खो जाती है यदि यह पहले से मौजूद है। केवल अंतिम फ़ाइल हैंडल फ़ाइल को सफलतापूर्वक लिख सकता है। यहां तक कि यदि आपने मोड को जोड़ने के लिए इसे बदल दिया है, तो आपको एक ही फाइल को एकाधिक प्रक्रियाओं से लिखने की कोशिश नहीं करनी चाहिए - यदि दो प्रक्रियाएं एक ही समय में लिखने की कोशिश करती हैं तो आउटपुट खराब हो जाएगा।
इसके बजाय, सभी कार्यकर्ता एक कतार में उत्पादन डाल प्रक्रियाओं है, और (या तो एक उपप्रक्रिया या मुख्य प्रक्रिया) एक एकल समर्पित प्रक्रिया है कतार से उत्पादन को संभालने और फ़ाइल पर लिखने:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
def Simulation(ii):
result = []
result.append(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
return result
def handle_output(result):
hdf = pt.openFile('simulation.h5', mode='a')
for args in result:
method, args = args
getattr(hdf, method)(*args)
hdf.close()
if __name__ == '__main__':
# clear the file
hdf = pt.openFile('simulation.h5', mode='w')
hdf.close()
pool = mp.Pool(num_processes)
for i in range(num_simulations):
pool.apply_async(Simulation, (i,), callback=handle_output)
pool.close()
pool.join()
:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None
def Simulation(inqueue, output):
for ii in iter(inqueue.get, sentinel):
output.put(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
def handle_output(output):
hdf = pt.openFile('simulation.h5', mode='w')
while True:
args = output.get()
if args:
method, args = args
getattr(hdf, method)(*args)
else:
break
hdf.close()
if __name__ == '__main__':
output = mp.Queue()
inqueue = mp.Queue()
jobs = []
proc = mp.Process(target=handle_output, args=(output,))
proc.start()
for i in range(num_processes):
p = mp.Process(target=Simulation, args=(inqueue, output))
jobs.append(p)
p.start()
for i in range(num_simulations):
inqueue.put(i)
for i in range(num_processes):
# Send the sentinal to tell Simulation to end
inqueue.put(sentinel)
for p in jobs:
p.join()
output.put(None)
proc.join()
तुलना के लिए, यहाँ एक संस्करण है जो mp.Pool
का उपयोग करता है
यह आसान लगता है ना? हालांकि एक संकेतक अंतर है। handle_output
पर तर्क भेजने के लिए मूल कोड output.put
का उपयोग किया गया था जो अपने स्वयं के उपप्रजाति में चल रहा था। handle_output
args
output
कतार से ले जाएगा और उन्हें तुरंत संभालेंगे। ऊपर पूल कोड के साथ, Simulation
args
result
और result
में handle_output
पर Simulation
रिटर्न के बाद तक args
का पूरा समूह जमा करता है।
यदि Simulation
में काफी समय लगता है, तो एक लंबी प्रतीक्षा अवधि होगी जबकि simulation.h5
पर कुछ भी नहीं लिखा जा रहा है।
इस प्रश्न के अतिरिक्त मैंने सफलता के साथ उपरोक्त कोड का उपयोग किया है, लेकिन अब मैं इस अनुकरण का विस्तार कर रहा हूं, एक लम्बाई (1000) द्वारा परिभाषित लूप के लिए और बी = रेंज (100) द्वारा परिभाषित लूप के लिए भी। इस स्मृति के परिणामस्वरूप मेरी याददाश्त का व्यापक उपयोग होता है। मेरे पास 16 जीबी रैम के साथ 8 सीपीयू है लेकिन जब मैं फ़ाइल चलाता हूं (वास्तविक सिमुलेशन के बिना भी) मेरा राम उपयोग 100% तक जाता है जिसके परिणामस्वरूप मेरा सिस्टम रुक जाता है। – user2143958
मुझे लगता है कि हमें कार्यों की संख्या से उपप्रोसेस की संख्या को अलग करने की आवश्यकता है। ऐसा लगता है कि आप 1000 कार्य चाहते हैं, लेकिन शायद 1000 उपप्रमुख नहीं हैं। मैं इस पोस्ट को संपादित करने के तरीके को संपादित करने के लिए संपादित करूंगा। – unutbu
हां आप सही हैं, पिछले पुनरावृत्तियों के लिए पिछले उदाहरण में समान स्मृति की समान मात्रा में सभी स्मृति को छिपाने के लिए बनाया गया था। आपके द्वारा संपादित की गई फ़ाइल सही काम करती है! लेकिन सिर्फ स्पष्टीकरण के लिए, मैं पूल() फ़ंक्शन के साथ भी प्रयोग कर रहा था और यह फ़ंक्शन भी काफी अच्छा काम करता प्रतीत होता है, हालांकि एक से अधिक परिवर्तनीय पारित होने पर यह कठिन हो जाता है। पूल() फ़ंक्शन पर प्रक्रिया() फ़ंक्शन चुनने का मुख्य कारण क्या है? – user2143958