मुझे लगता है कि आपकी समस्या मुख्य रूप से इस तथ्य के साथ है कि आपके समानांतर कार्य ऑब्जेक्ट के विधि है। यह अधिक जानकारी के बिना कुछ हो सकता है, लेकिन इस छोटे से खिलौना कार्यक्रम पर विचार करना मुश्किल है:
import multiprocessing as mp
import numpy as np
import gc
class Object(object):
def __init__(self, _):
self.data = np.empty((100, 100, 100), dtype=np.float64)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def new_obj(self, i):
return Object(i)
def __del__(self):
print("Dead")
if __name__ == '__main__':
c = Container()
for j in range(5):
c.foo()
अब Container
केवल एक बार कहा जाता है, ताकि आप एक "Born"
, एक "Dead"
के बाद बाहर मुद्रित किया जा रहा देखने के लिए उम्मीद थी; लेकिन चूंकि प्रक्रियाओं द्वारा निष्पादित कोड को कंटेनर की विधि है, इसका मतलब है कि पूरे कंटेनर को कहीं और निष्पादित किया जाना है! इस चल रहा है, आप मिश्रित "Born"
और "Dead"
की एक धारा देखेंगे के रूप में अपने कंटेनर हर निष्पादन नक्शे के पर फिर से बनाया जा रहा है:
Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
...
<MANY MORE LINES HERE>
...
Born
Dead
खुद समझा दिया है कि पूरे कंटेनर की नकल की जा रही है और चारों ओर हर बार भेजा, कुछ गैर-serialisable मूल्य सेट करने का प्रयास:
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.fn = lambda x: x**2
self.objects.extend(result.get())
pool.terminate()
gc.collect()
के रूप में यह कंटेनर serialise नहीं कर सकते हैं जो तुरंत एक AttributeError
बढ़ा देंगे।
के योग करते हैं: जब पूल के लिए 1000 अनुरोध भेजने, Container
धारावाहिक जाएगा, प्रक्रियाओं को भेजा और वहाँ एक 1000 बार deserialised। निश्चित रूप से, उन्हें अंततः गिरा दिया जाएगा (माना जाता है कि बहुत अधिक अजीब क्रॉस-रेफरेंसिंग नहीं चल रहा है), लेकिन यह निश्चित रूप से रैम पर बहुत अधिक दबाव डालेगा, क्योंकि ऑब्जेक्ट को क्रमबद्ध, कॉल किया गया, अपडेट किया गया, पुनर्विक्रय किया गया है ... प्रत्येक के लिए आपके मैप किए गए इनपुट में तत्व।
आप इसे कैसे हल कर सकते हैं? खैर, आदर्श, का हिस्सा नहीं है राज्य:
def new_obj(_):
return Object(_)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
इस बार का एक अंश में भरता है और केवल (के रूप में एक Container
कभी बनाया गया है) रैम पर सबसे नन्हा वायुयान पैदा करता है।अगर आपको वहां जाने के लिए कुछ आंतरिक राज्यों की आवश्यकता है, तो इसे निकालें और केवल यह भेजें:
def new_obj(tup):
very_important_state, parameters = tup
return Object(very_important_state=very_important_state,
parameters=parameters)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
important_state = len(self.objects)
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj,
((important_state, i) for i in range(50)))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
यह पहले जैसा ही व्यवहार है। यदि आप बिल्कुल प्रक्रियाओं के बीच कुछ उत्परिवर्तनीय स्थिति साझा करने से नहीं बच सकते हैं, तो the multiprocessing tools को चेकआउट आउट करने के लिए इसे हर जगह हर जगह कॉपी करने के बिना चेकआउट करें।
कृपया मेरा संपादन देखें। तो अगर मैं आपको सही समझता हूं, तो मुझे प्रत्येक प्रक्रिया में अपने ऑब्जेक्ट के बाहर एक बाहरी विधि कॉल करने की आवश्यकता है? – Jonas
समानांतर कार्य 'self.new_obj', ऑब्जेक्ट की एक विधि होने के नाते, पूरे माता-पिता नोड को serialized और प्रत्येक कॉल पर चारों ओर भेजा जाता है; यदि आप उस विधि को निकाल सकते हैं ताकि _function_ 'new_obj (...)' एक (सरल, अनाथ, 'स्टेटलेस') लौटाएगा, नया नोड और 'foo' इसे जोड़ने का प्रभारी है (रेफर्स पैरेंट <-> बच्चे इत्यादि जोड़ना। .. लेकिन _calling process_ में), यह पूरी समस्या गायब होने की संभावना है: बाल प्रक्रियाओं को केवल एक न्यूनतम राज्य को भेजने की आवश्यकता होती है। – val