2012-12-03 10 views
7

अब तक जब भी मुझे multiprocessing का उपयोग करने की आवश्यकता है, मैंने मैन्युअल रूप से "प्रक्रिया पूल" बनाकर और सभी सबप्रोसेसेस के साथ एक कार्यशील कतार साझा करके ऐसा किया है।पाइथन मल्टीप्रोसेसिंग पूल द्वारा किए जाने वाले "काम" की मात्रा कैसे प्राप्त करें?

उदाहरण के लिए:

from multiprocessing import Process, Queue 


class MyClass: 

    def __init__(self, num_processes): 
     self._log   = logging.getLogger() 
     self.process_list = [] 
     self.work_queue = Queue() 
     for i in range(num_processes): 
      p_name = 'CPU_%02d' % (i+1) 
      self._log.info('Initializing process %s', p_name) 
      p = Process(target = do_stuff, 
         args = (self.work_queue, 'arg1'), 
         name = p_name) 

इस तरह से मैं कतार है, जो subprocesses से भस्म किया जाएगा करने के लिए सामान जोड़ सकते हैं। मैं तो की निगरानी कर सकता है कितनी दूर प्रसंस्करण Queue.qsize() की जाँच करके किया गया था:

while True: 
     qsize = self.work_queue.qsize() 
     if qsize == 0: 
      self._log.info('Processing finished') 
      break 
     else: 
      self._log.info('%d simulations still need to be calculated', qsize) 

अब मैं समझ multiprocessing.Pool एक बहुत इस कोड को आसान बनाने में कर सकते हैं।

मुझे क्या पता नहीं चला कि मैं अभी भी "काम" की मात्रा की निगरानी कैसे कर सकता हूं।

from multiprocessing import Pool 


class MyClass: 

    def __init__(self, num_processes): 
     self.process_pool = Pool(num_processes) 
     # ... 
     result_list = [] 
     for i in range(1000):    
      result = self.process_pool.apply_async(do_stuff, ('arg1',)) 
      result_list.append(result) 
     # ---> here: how do I monitor the Pool's processing progress? 
     # ...? 

कोई भी विचार:

निम्नलिखित उदाहरण लें?

उत्तर

11

Manager कतार का उपयोग करें। यह एक कतार है जिसे कार्यकर्ता प्रक्रियाओं के बीच साझा किया जाता है। यदि आप एक सामान्य कतार का उपयोग करते हैं तो यह प्रत्येक कार्यकर्ता द्वारा मसालेदार और अनचाहे हो जाएगा और इसलिए प्रतिलिपि बनाई जाएगी, ताकि प्रत्येक कार्यकर्ता द्वारा कतार को अद्यतन नहीं किया जा सके।

तब आपके कर्मचारी आपके कतार में सामान जोड़ते हैं और कतार के राज्य की निगरानी करते हैं जबकि श्रमिक काम कर रहे हैं। आपको map_async का उपयोग करके ऐसा करने की आवश्यकता है क्योंकि इससे आपको यह देखने की सुविधा मिलती है कि पूरा परिणाम कब तैयार होता है, जिससे आप निगरानी पाश को तोड़ सकते हैं।

उदाहरण:

import time 
from multiprocessing import Pool, Manager 


def play_function(args): 
    """Mock function, that takes a single argument consisting 
    of (input, queue). Alternately, you could use another function 
    as a wrapper. 
    """ 
    i, q = args 
    time.sleep(0.1) # mock work 
    q.put(i) 
    return i 

p = Pool() 
m = Manager() 
q = m.Queue() 

inputs = range(20) 
args = [(i, q) for i in inputs] 
result = p.map_async(play_function, args) 

# monitor loop 
while True: 
    if result.ready(): 
     break 
    else: 
     size = q.qsize() 
     print(size) 
     time.sleep(0.1) 

outputs = result.get() 
0

दस्तावेज़ों से, यह मुझे लगता है कि आप जो करना चाहते हैं वह आपकी result एस किसी सूची या अन्य अनुक्रम में एकत्रित करना है, फिर अपनी आउटपुट सूची बनाने के लिए ready के परिणाम सूची की जांच करें। इसके बाद आप शेष परिणाम ऑब्जेक्ट्स की संख्या की तुलना करके प्रसंस्करण स्थिति की गणना कर सकते हैं, जो तैयार राज्य में प्रेषित नौकरियों की कुल संख्या तक नहीं है। http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult

1

मैं async_call के लिए नीचे दिए गए समाधान के साथ आया था।

ट्रिविअल खिलौना स्क्रिप्ट उदाहरण लेकिन मुझे लगता है कि व्यापक रूप से लागू होना चाहिए।

मूल रूप से एक अनंत लूप सर्वेक्षण में सूची परिणाम जनरेटर में आपके परिणाम ऑब्जेक्ट्स का तैयार मूल्य और योग आपके कितने प्रेषित पूल कार्य शेष हैं, इसकी गणना करने के लिए राशि।

एक बार कोई भी ब्रेक शेष नहीं होता है और() & बंद()।

वांछित के रूप में लूप में नींद जोड़ें।

उपरोक्त समाधान के रूप में समान सिद्धांत लेकिन कतार के बिना। यदि आप ट्रैक करते हैं कि आपने प्रारंभ में पूल कितने कार्यों को भेजा है तो आप प्रतिशत पूर्ण, आदि की गणना कर सकते हैं ...

import multiprocessing 
import os 
import time 
from random import randrange 


def worker(): 
    print os.getpid() 

    #simulate work 
    time.sleep(randrange(5)) 

if __name__ == '__main__': 

    pool = multiprocessing.Pool(processes=8) 
    result_objs = [] 

    print "Begin dispatching work" 

    task_count = 10 
    for x in range(task_count): 
     result_objs.append(pool.apply_async(func=worker)) 

    print "Done dispatching work" 

    while True: 
     incomplete_count = sum(1 for x in result_objs if not x.ready()) 

     if incomplete_count == 0: 
      print "All done" 
      break 

     print str(incomplete_count) + " Tasks Remaining" 
     print str(float(task_count - incomplete_count)/task_count * 100) + "% Complete" 
     time.sleep(.25) 

    pool.close() 
    pool.join() 
1

मैं एक ही समस्या थी और MapResult वस्तुओं के लिए कुछ हद तक एक सरल समाधान (आंतरिक MapResult डेटा का उपयोग कर यद्यपि)

pool = Pool(POOL_SIZE) 

result = pool.map_async(get_stuff, todo) 
while not result.ready(): 
    remaining = result._number_left * result._chunksize 
    sys.stderr.write('\r\033[2KRemaining: %d' % remaining) 
    sys.stderr.flush() 
    sleep(.1) 

print >> sys.stderr, '\r\033[2KRemaining: 0' 

नोट के साथ आया था कि शेष मूल्य हमेशा सटीक के बाद से नहीं है किया है प्रक्रिया के लिए वस्तुओं की संख्या के आधार पर खंड आकार अक्सर गोलाकार होता है।

आप pool.map_async(get_stuff, todo, chunksize=1)

का उपयोग कर इसे सर्किट कर सकते हैं
संबंधित मुद्दे

 संबंधित मुद्दे