2012-03-31 18 views
9

py2.6 में +, multiprocessing मॉड्यूल एक Pool वर्ग, प्रदान करता है इसलिए एक कर सकते हैं:बहु और कचरा संग्रहण

class Volatile(object): 
    def do_stuff(self, ...): 
     pool = multiprocessing.Pool() 
     return pool.imap(...) 

हालांकि, मानक अजगर कार्यान्वयन के साथ 2.7.2 में, इस दृष्टिकोण जल्द ही करने के लिए सुराग "IOError: [Errno 24] बहुत सारी खुली फाइलें"। जाहिर है pool ऑब्जेक्ट कभी कचरा इकट्ठा नहीं होता है, इसलिए इसकी प्रक्रियाएं कभी भी समाप्त नहीं होती हैं, जो भी वर्णनकर्ता आंतरिक रूप से खोले जाते हैं। मैं क्योंकि निम्न काम करता है इस बारे में सोच:

class Volatile(object): 
    def do_stuff(self, ...): 
     pool = multiprocessing.Pool() 
     result = pool.map(...) 
     pool.terminate() 
     return result 

मैं imap की "सुस्त" इटरेटर दृष्टिकोण रखना चाहते हैं; उस मामले में कचरा कलेक्टर कैसे काम करता है? कोड को कैसे ठीक करें?

+0

आप दे सकते हैं क्या '' ... अपने 'pool.map (...)' के अंदर है के बारे में एक संकेत है? – SingleNegationElimination

+0

निश्चित रूप से। '... 'केवल पढ़ने योग्य हैं लेकिन' वाष्पशील 'ऑब्जेक्ट के सदस्य चर पर CPU-भारी संचालन। मैं प्रदर्शन को बेहतर बनाने के लिए समानांतर में निष्पादित करना चाहता हूं। वस्तु 'do_stuff' की अवधि के लिए उत्परिवर्तित नहीं है। – user124114

उत्तर

8

अंत में, मैं एक बार pool.imap इटरेटर आसपास pool संदर्भ गुजर और मैन्युअल रूप से इसे समाप्त समाप्त हो गया समाप्त हो गया था:

class Volatile(object): 
    def do_stuff(self, ...): 
     pool = multiprocessing.Pool() 
     return pool, pool.imap(...) 

    def call_stuff(self): 
     pool, results = self.do_stuff() 
     for result in results: 
      # lazy evaluation of the imap 
     pool.terminate() 

मामले में किसी को भी भविष्य में इस समाधान पर ठोकर: chunksize पैरामीटर बहुतPool.imap में महत्वपूर्ण है (जैसा कि सादा ०१२३९७४०४०४२ करने का विरोध किया, जहां इससे कोई फर्क नहीं पड़ता)। मैं मैन्युअल रूप से इसे सेट करता हूं ताकि प्रत्येक प्रक्रिया 1 + len(input)/len(pool) नौकरियां प्राप्त करे। इसे डिफ़ॉल्ट chunksize=1 पर छोड़कर मुझे वही प्रदर्शन दिया जैसे मैंने समांतर प्रसंस्करण का उपयोग नहीं किया ... बुरा।

मुझे लगता है कि आदेश दिया गया imap बनाम आदेश map का उपयोग करने के लिए कोई वास्तविक लाभ नहीं है, मैं बस व्यक्तिगत रूप से इसे बेहतर पसंद करता हूं।

+0

नहीं, मेरा मतलब है कि मुझे इटरेटर पसंद है। प्रत्येक जनरेटर एक पुनरावर्तक है, बीटीडब्ल्यू। – user124114

+0

मेरे मामले में, मुझे उस काम के बाद 'gc.collect()' प्राप्त करने के लिए 'pool.terminate()' को कॉल करना होगा। अन्यथा, पाइथन पूल में संदर्भित उन वस्तुओं को जीसी नहीं करेगा, यहां तक ​​कि स्पष्ट 'डेल पूल' के साथ भी। –

3

पायथन में, मूल रूप से इसकी कोई गारंटी नहीं है कि चीजें नष्ट हो जाएंगी, और इस मामले में यह नहीं है कि मल्टीप्रोसेसिंग पूल का उपयोग कैसे किया जा सकता है।

सही काम करने के लिए फ़ंक्शन में एकाधिक कॉल में एक पूल साझा करना है। ऐसा करने के लिए सबसे आसान तरीका है एक वर्ग के रूप पूल स्टोर करने के लिए है (या, हो सकता है, उदाहरण के) अलग-अलग:

class Dispatcher: 
    pool = multiprocessing.Pool() 
    def do_stuff(self, ...): 
     result = self.pool.map(...) 
     return result 
+0

आंतरिक रूप से पूल() 'कांटा नहीं है? आपका समाधान "प्रक्रिया" को उत्पन्न प्रक्रियाओं की स्थिति को "अद्यतन" कैसे करेगा जब वास्तव में 'do_stuff() 'कहा जाता है? (जैसा कि 'डिस्पैचर' का मूल्यांकन किया जाता है) के विपरीत, मास्टर प्रक्रिया के साथ सब कुछ सिंक में रखने के लिए बल्कि जटिल लगता है। – user124114

+0

एक सदस्य चर के रूप में एक पूल भंडारण ठीक है; मैं राज्य के साथ आपकी समस्या को समझ नहीं पा रहा हूं - आप किस राज्य को साझा करना चाहते हैं? यदि आप अपनी प्रक्रियाओं को एक ही दुभाषिया स्थिति साझा करना चाहते हैं, तो आपको शायद इसके बजाय धागे का उपयोग करना चाहिए ... – James

+1

धन्यवाद @Autopulated। जीआईएल के कारण थ्रेड ज्यादा नहीं करते हैं। जिस राज्य को मैं साझा करना चाहता हूं वह वह वस्तु है जिस पर 'do_stuff' को कॉल किया गया था (= बड़े पैमाने पर पढ़ने-योग्य वस्तु पर महंगा ऑपरेशन, प्रतिलिपि नहीं ले सकता)। – user124114

2

दरअसल, यहां तक ​​कि जब pool वस्तु के लिए सभी उपयोगकर्ता संदर्भ हटा दिए जाते हैं, और कोई कार्य कतार कोड में हैं, और सब कचरा संग्रहण किया जाता है, तो अभी भी प्रक्रियाओं रहने के रूप में व्यर्थ लाश ऑपरेटिंग सिस्टम में - प्लस हम 3 ज़ोंबी सेवा धागे Pool फांसी (अजगर 2.7 और 3.4) से है:

>>> del pool 
>>> gc.collect() 
0 
>>> gc.garbage 
[] 
>>> threading.enumerate() 
[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>, 
<Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>] 

और आगे Pool() के अधिक से अधिक प्रक्रिया जोड़ सकते हैं और लाश थ्रेड ... जो जब तक मुख्य प्रक्रिया समाप्त होता है रहना होगा ।

यह ज़ोंबी पूल को रोकने के लिए एक विशेष प्रहार की आवश्यकता है - अपनी सेवा धागा _handle_workers के माध्यम से:

>>> ths = threading.enumerate() 
>>> for th in ths: 
...  try: th.name, th._state, th._Thread__target 
...  except AttributeError: pass 
...  
('MainThread', 1, None) 
('Thread-8', 0, <function _handle_tasks at 0x01462A30>) 
('Thread-9', 0, <function _handle_results at 0x014629F0>) 
('Thread-7', 0, <function _handle_workers at 0x01462A70>) 
>>> ths[-1]._state = multiprocessing.pool.CLOSE # or TERMINATE 
>>> threading.enumerate() 
[<_MainThread(MainThread, started 5632)>] 
>>> 

अन्य सेवा धागे समाप्त हो जाता है यही कारण है कि और भी बच्चे प्रक्रियाओं समाप्त हो जाता है।


मुझे लगता है कि एक समस्या है, वहाँ अजगर पुस्तकालय में एक संसाधन रिसाव बग, weakref की का सही उपयोग द्वारा निर्धारित किया जा सकता है जो है।

अन्य मुद्दा यह है कि Pool निर्माण & समाप्ति महंगा है (पूल प्रति 3 सेवा धागे सिर्फ प्रबंधन के लिए सहित!), और वहाँ ususually सीपीयू कोर की तुलना में अधिक कार्यकर्ता प्रक्रियाओं (उच्च CPU लोड होता है) के लिए कोई कारण नहीं है या किसी सीमित सीमित संसाधन (जैसे नेटवर्क बैंडविड्थ) के अनुसार सीमित संख्या से अधिक। तो एक पूल को एक सिंगलुलर ऐप ग्लोबल रिसोर्सेज (वैकल्पिक रूप से टाइमआउट द्वारा प्रबंधित) की तरह एक क्लोजर (या एक समाप्त() - बग के कारण कामकाज के बजाय एक पूल के इलाज के लिए उचित है।

उदाहरण के लिए:

try: 
    _unused = pool # reload safe global var 
except NameError: 
    pool = None 

def get_pool(): 
    global pool 
    if pool is None: 
     atexit.register(stop_pool) 
     pool = Pool(CPUCORES) 
    return pool 

def stop_pool(): 
    global pool 
    if pool: 
     pool.terminate() 
     pool = None 
संबंधित मुद्दे