2012-03-07 14 views
6

पर ब्लॉक नहीं करती मैं काफी अजगर करने के लिए नया हूँ। मैं stdin पर टेक्स्ट की पंक्तियों को पढ़ने के लिए, उन्हें किसी तरह से परिवर्तित करने और उन्हें एक डेटाबेस में लिखने के लिए बहु मॉड्यूल का उपयोग कर रहा हूँ। जब तक मैं बड़ा इनपुट फ़ाइलें (लाइनों के लाखों-करोड़ों) मेरी अजगर कार्यक्रम में कार्रवाई करने के लिए है कि मैं पाइप मिलता है,अजगर पूल apply_async और map_async पूर्ण कतार

batch = [] 
pool = multiprocessing.Pool(20) 
i = 0 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     pool.apply_async(insert, args=(batch,i+1)) 
     batch = [] 
pool.apply_async(insert, args=(batch,i)) 
pool.close() 
pool.join() 

अब जब कि सब ठीक काम करता है: यहाँ मेरी कोड का एक टुकड़ा है। किसी बिंदु पर, जब मेरा डेटाबेस धीमा हो जाता है, तो मुझे लगता है कि स्मृति पूर्ण हो रही है।

कुछ खेलने के बाद, यह पता चला कि pool.apply_async के साथ-साथ pool.map_async कभी भी अवरुद्ध नहीं होता है, ताकि संसाधित होने वाली कॉल की कतार बड़ी और बड़ी हो।

मेरी समस्या को सही दृष्टिकोण क्या है? मैं एक पैरामीटर की अपेक्षा करता हूं जिसे मैं सेट कर सकता हूं, जो पूल.एपली_एसिंक कॉल को अवरुद्ध कर देगा, जैसे ही एक निश्चित कतार लंबाई तक पहुंच गई है। जावा में AFAIR कोई भी उस उद्देश्य के लिए एक निश्चित लंबाई के साथ ThreadPoolExecutor को एक ब्लॉकिंगक्यूयू दे सकता है।

धन्यवाद!

+1

_ "यह रूप में अच्छी तरह कि pool.apply_async निकला pool.map_async कभी ब्लॉक कभी नहीं के रूप में" _ - सब कुछ मैं अच्छी तरह से AsyncResult पर इंतजार कर अपनी समस्या के रूप में मदद नहीं करता – leon

उत्तर

2

apply_async रिटर्न एक AsyncResult वस्तु है, जो आप पर wait कर सकते हैं:

if len(batch) >= 10000: 
    r = pool.apply_async(insert, args=(batch, i+1)) 
    r.wait() 
    batch = [] 

हालांकि अगर आप एक क्लीनर तरीके से ऐसा करना चाहते हैं, तो आप 10000 की एक maxsize के साथ एक multiprocessing.Queue का उपयोग करना चाहिए, और एक Worker निकाले जाते हैं multiprocessing.Process से कक्षा जो ऐसी कतार से प्राप्त होती है।

+1

लिए देख रहा था है कि में कतार पूल बड़ा हो जाता है। मुझे आश्चर्य है कि क्या मैं पूल में आंतरिक कतार के आकार को नियंत्रित कर सकता हूं? – konstantin

+0

@ कोंस्टेंटिन: मुझे यकीन नहीं है कि मैं समझता हूं। जबकि आप 'AsyncResult' की प्रतीक्षा कर रहे हैं, तो मास्टर प्रक्रिया अगले बैच को भर नहीं सकती है, है ना? –

9

बस अगर कोई यहां समाप्त होता है, तो इस तरह मैंने समस्या हल की: मैंने multiprocessing.Pool का उपयोग करना बंद कर दिया। यहाँ कैसे मैं इसे अब क्या है:

#set amount of concurrent processes that insert db data 
processes = multiprocessing.cpu_count() * 2 

#setup batch queue 
queue = multiprocessing.Queue(processes * 2) 

#start processes 
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches  
batch=[] 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     queue.put((batch,i+1)) 
     batch = [] 
if batch: 
    queue.put((batch,i+1)) 

#stop processes using poison-pill 
for _ in range(processes): queue.put((None,None)) 

print "all done." 
डालने विधि में

प्रत्येक बैच के प्रसंस्करण एक पाश है कि कतार से खींचती है जब तक यह जहर की गोली प्राप्त करता है में लपेटा जाता है:

while True: 
    batch, end = queue.get() 
    if not batch and not end: return #poison pill! complete! 
    [process the batch] 
print 'worker done.' 
+0

अच्छा सरल उदाहरण। मल्टीप्रोसेसिंग का पूल अक्सर इसके मुकाबले ज्यादा परेशानी होती है, खासकर जब से अपना खुद का प्रोसेस पूल बनाना काफी सरल है। – travc

8

apply_async और map_async फ़ंक्शन मुख्य प्रक्रिया को अवरुद्ध न करने के लिए डिज़ाइन किए गए हैं। ऐसा करने के लिए, Pool एक आंतरिक Queue बनाए रखता है जो आकार बदलने के लिए दुर्भाग्य से असंभव है।

जिस तरह से समस्या हल की जा सकती है वह Semaphore का उपयोग करके उस आकार के साथ शुरू किया गया है जिसे आप कतार चाहते हैं। पूल को खिलाने से पहले और कार्यकर्ता के कार्य को पूरा करने से पहले आप सेफफोरे को प्राप्त और छोड़ दें।

यहाँ अजगर 2.6 या अधिक से अधिक के साथ काम कर एक उदाहरण है।

from threading import Semaphore 
from multiprocessing import Pool 

def task_wrapper(f): 
    """Python2 does not allow a callback for method raising exceptions, 
    this wrapper ensures the code run into the worker will be exception free. 

    """ 
    try: 
     return f() 
    except: 
     return None 

def TaskManager(object): 
    def __init__(self, processes, queue_size): 
     self.pool = Pool(processes=processes) 
     self.workers = Semaphore(processes + queue_size) 

    def new_task(self, f): 
     """Start a new task, blocks if queue is full.""" 
     self.workers.acquire() 
     self.pool.apply_async(task_wrapper, args=(f,), callback=self.task_done)) 

    def task_done(self): 
     """Called once task is done, releases the queue is blocked.""" 
     self.workers.release() 
संबंधित मुद्दे