2016-07-01 13 views
6

धीमा करता है I नई ऑब्जेक्ट्स की सूची बनाने के लिए कई प्रक्रियाएं शुरू करता है। htop मुझे 1 और 4 प्रक्रियाओं के बीच दिखाता है (मैं हमेशा 3 नई वस्तुओं को बना देता हूं)।पायथन 3: मल्टीप्रोसेसिंग काफी अधिक RAM का उपभोग करता है और

def foo(self): 
    with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, self.information) 
     self.new_objs = result.get() 
     pool.terminate() 
    gc.collect() 

मैं foo() कई बार, हर बार यह कहा जाता है, पूरी प्रक्रिया धीमी चल रहा है कहते हैं, कार्यक्रम भी, अंत में खत्म नहीं है, क्योंकि यह बहुत करने के लिए धीमा। कार्यक्रम मेरी सभी रैम खाने शुरू होता है, जबकि अनुक्रमिक दृष्टिकोण में कोई महत्वपूर्ण RAM उपयोग नहीं होता है।

जब मैं प्रोग्राम को मारता हूं, तो अधिकांश समय यह कार्यक्रम आखिरी निष्पादन था।

->File "threading.py", line 293, in wait 
    waiter.acquire() 

संपादित मेरी परिस्थितियों के बारे में कुछ जानकारी देने के लिए। मैं नोड्स से बना एक पेड़ बनाते हैं। अपने बच्चे नोड्स बनाने के लिए foo() को पैरेंट नोड द्वारा बुलाया जाता है। प्रक्रियाओं द्वारा लौटाए गए result इन बच्चे नोड्स हैं। वे माता-पिता नोड पर एक सूची में सहेजे जाते हैं। मैं अनुक्रमिक तरीके से उन्हें बनाने के बजाय उन बच्चों नोड्स के निर्माण को समानांतर बनाना चाहता हूं।

उत्तर

2

मुझे लगता है कि आपकी समस्या मुख्य रूप से इस तथ्य के साथ है कि आपके समानांतर कार्य ऑब्जेक्ट के विधि है। यह अधिक जानकारी के बिना कुछ हो सकता है, लेकिन इस छोटे से खिलौना कार्यक्रम पर विचार करना मुश्किल है:

import multiprocessing as mp 
import numpy as np 
import gc 


class Object(object): 
    def __init__(self, _): 
     self.data = np.empty((100, 100, 100), dtype=np.float64) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(self.new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def new_obj(self, i): 
     return Object(i) 

    def __del__(self): 
     print("Dead") 


if __name__ == '__main__': 
    c = Container() 
    for j in range(5): 
     c.foo() 

अब Container केवल एक बार कहा जाता है, ताकि आप एक "Born", एक "Dead" के बाद बाहर मुद्रित किया जा रहा देखने के लिए उम्मीद थी; लेकिन चूंकि प्रक्रियाओं द्वारा निष्पादित कोड को कंटेनर की विधि है, इसका मतलब है कि पूरे कंटेनर को कहीं और निष्पादित किया जाना है! इस चल रहा है, आप मिश्रित "Born" और "Dead" की एक धारा देखेंगे के रूप में अपने कंटेनर हर निष्पादन नक्शे के पर फिर से बनाया जा रहा है:

Born 
Born 
Born 
Born 
Born 
Dead 
Born 
Dead 
Dead 
Born 
Dead 
Born 
... 
<MANY MORE LINES HERE> 
... 
Born 
Dead 

खुद समझा दिया है कि पूरे कंटेनर की नकल की जा रही है और चारों ओर हर बार भेजा, कुछ गैर-serialisable मूल्य सेट करने का प्रयास:

def foo(self): 
    with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, range(50)) 
     self.fn = lambda x: x**2 
     self.objects.extend(result.get()) 
     pool.terminate() 
    gc.collect() 

के रूप में यह कंटेनर serialise नहीं कर सकते हैं जो तुरंत एक AttributeError बढ़ा देंगे।

के योग करते हैं: जब पूल के लिए 1000 अनुरोध भेजने, Container धारावाहिक जाएगा, प्रक्रियाओं को भेजा और वहाँ एक 1000 बार deserialised। निश्चित रूप से, उन्हें अंततः गिरा दिया जाएगा (माना जाता है कि बहुत अधिक अजीब क्रॉस-रेफरेंसिंग नहीं चल रहा है), लेकिन यह निश्चित रूप से रैम पर बहुत अधिक दबाव डालेगा, क्योंकि ऑब्जेक्ट को क्रमबद्ध, कॉल किया गया, अपडेट किया गया, पुनर्विक्रय किया गया है ... प्रत्येक के लिए आपके मैप किए गए इनपुट में तत्व।

आप इसे कैसे हल कर सकते हैं? खैर, आदर्श, का हिस्सा नहीं है राज्य:

def new_obj(_): 
    return Object(_) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

इस बार का एक अंश में भरता है और केवल (के रूप में एक Container कभी बनाया गया है) रैम पर सबसे नन्हा वायुयान पैदा करता है।अगर आपको वहां जाने के लिए कुछ आंतरिक राज्यों की आवश्यकता है, तो इसे निकालें और केवल यह भेजें:

def new_obj(tup): 
    very_important_state, parameters = tup 
    return Object(very_important_state=very_important_state, 
        parameters=parameters) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     important_state = len(self.objects) 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, 
            ((important_state, i) for i in range(50))) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

यह पहले जैसा ही व्यवहार है। यदि आप बिल्कुल प्रक्रियाओं के बीच कुछ उत्परिवर्तनीय स्थिति साझा करने से नहीं बच सकते हैं, तो the multiprocessing tools को चेकआउट आउट करने के लिए इसे हर जगह हर जगह कॉपी करने के बिना चेकआउट करें।

+0

कृपया मेरा संपादन देखें। तो अगर मैं आपको सही समझता हूं, तो मुझे प्रत्येक प्रक्रिया में अपने ऑब्जेक्ट के बाहर एक बाहरी विधि कॉल करने की आवश्यकता है? – Jonas

+0

समानांतर कार्य 'self.new_obj', ऑब्जेक्ट की एक विधि होने के नाते, पूरे माता-पिता नोड को serialized और प्रत्येक कॉल पर चारों ओर भेजा जाता है; यदि आप उस विधि को निकाल सकते हैं ताकि _function_ 'new_obj (...)' एक (सरल, अनाथ, 'स्टेटलेस') लौटाएगा, नया नोड और 'foo' इसे जोड़ने का प्रभारी है (रेफर्स पैरेंट <-> बच्चे इत्यादि जोड़ना। .. लेकिन _calling process_ में), यह पूरी समस्या गायब होने की संभावना है: बाल प्रक्रियाओं को केवल एक न्यूनतम राज्य को भेजने की आवश्यकता होती है। – val

संबंधित मुद्दे