2012-06-15 9 views
9

Pool.apply_async का उपयोग करके बड़ी संख्या में कार्यों (बड़े पैरामीटर के साथ) चलाते समय, प्रक्रिया आवंटित की जाती है और प्रतीक्षा स्थिति पर जाती है, और प्रतीक्षा प्रक्रियाओं की संख्या के लिए कोई सीमा नहीं होती है। यह सभी स्मृति खाने से खत्म कर सकते हैं नीचे उदाहरण में है:पायथन मल्टीप्रोसेसिंग: प्रतीक्षा प्रक्रियाओं की संख्या को सीमित करने के लिए कैसे?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

मैं इंतज़ार कर कतार सीमित करने के लिए एक तरह से के लिए खोज रहा हूँ, इस तरह से वहाँ केवल प्रक्रियाओं इंतज़ार कर की एक सीमित संख्या है, और प्रतीक्षा कतार पूर्ण होने पर Pool.apply_async अवरुद्ध है।

+0

अच्छा उदाहरण (+1)। – mgilson

उत्तर

6

multiprocessing.Pool में _taskqueue प्रकार multiprocessing.Queue का सदस्य है, जो वैकल्पिक maxsize पैरामीटर लेता है; दुर्भाग्य से यह maxsize पैरामीटर सेट के बिना इसे बनाता है।

मैं multiprocessing.Pool.__init__ की एक कॉपी-पेस्ट कि maxsize_taskqueue निर्माता गुजरता साथ उपवर्गीकरण multiprocessing.Pool सलाह देते हैं।

बंदर-पैच वस्तु (या तो पूल या कतार) भी काम करेगा, लेकिन आप तो यह काफी भंगुर होगा pool._taskqueue._maxsize और pool._taskqueue._sem monkeypatch होगा:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

मैं पायथन 2.7.3 का उपयोग कर रहा हूं, और _taskqueue प्रकार Queue.Queue है। इसका मतलब है कि यह एक साधारण कतार है, न कि मल्टीप्रोसेसिंग। क्यूयू। मल्टीप्रोसेसिंग उप-वर्गीकरण। पुल और ओवरराइडिंग __init__ ठीक काम करता है, लेकिन वस्तु बंदर-पैचिंग अपेक्षित के रूप में काम नहीं कर रहा है। हालांकि, यह वह हैक है जिसे मैं खोज रहा था, धन्यवाद। –

0

आप स्पष्ट कतार जोड़ सकता है अधिकतम पैरामीटर के साथ और इस मामले में pool.apply_async() के बजाय queue.put() का उपयोग करें। तब कार्यकर्ता प्रक्रियाओं सकता है:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

'imap' का उपयोग करने से कोई फर्क नहीं पड़ता। इनपुट कतार अभी भी असीमित है और इस समाधान का उपयोग करने से सभी मेमोरी खत्म हो जाएगी। – Radim

+0

@ रैडीम: उत्तर में 'imap' कोड तब भी काम करता है जब आप इसे अनंत जनरेटर देते हैं। – jfs

+0

पायथन 2 में नहीं, दुर्भाग्य से (पीई 3 में कोड नहीं देखा है)। कुछ कामों के आस-पास के लिए, [यह SO उत्तर] देखें (http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration)। – Radim

1
:

for a, b in iter(queue.get, sentinel): 
    # process it 

तो आप pool.imap*() तरीकों का उपयोग कर सकता है आपके द्वारा बनाए गए इनपुट तर्क/परिणाम है कि लगभग सक्रिय कार्यकर्ता प्रक्रियाओं की संख्या के स्मृति में हैं की संख्या सीमित करना चाहते हैं

प्रतीक्षा करें यदि pool._taskqueue वांछित आकार से अधिक है:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

बस यह जोड़ना चाहते हैं कि मुझे यह मल्टीप्रोसेसिंग के साथ मेरी स्मृति समस्याओं का सबसे आसान समाधान माना जाता है। मैंने max_apply_size = 10 का उपयोग किया और यह मेरी समस्या के लिए ठीक काम करता है, जो एक धीमी फ़ाइल रूपांतरण है। @ सेतुमूर के रूप में एक सेमफोर का उपयोग करना एक और मजबूत समाधान की तरह लगता है लेकिन सरल लिपियों के लिए अधिक हो सकता है। – Nate

संबंधित मुद्दे