2013-08-21 6 views
5

मैं पायथन में समवर्ती धागे की संख्या को कैसे सीमित कर सकता हूं?पायथन में समवर्ती धागे की संख्या को सीमित करने के लिए कैसे?

उदाहरण के लिए, मेरे पास कई फाइलों के साथ एक निर्देशिका है, और मैं उन सभी को संसाधित करना चाहता हूं, लेकिन केवल एक ही समय में समानांतर में।

def process_file(fname): 
     # open file and do something                        

def process_file_thread(queue, fname): 
    queue.put(process_file(fname)) 

def process_all_files(d): 
    files=glob.glob(d + '/*') 
    q=Queue.Queue() 
    for fname in files: 
     t=threading.Thread(target=process_file_thread, args=(q, fname)) 
     t.start() 
    q.join() 

def main(): 
    process_all_files('.') 
    # Do something after all files have been processed 

मैं कोड कैसे सुधार सकते हैं ताकि केवल 4 धागे एक समय में चलाए जा रहे हैं:

यहाँ मैं अब तक किया है?

ध्यान दें कि मैं सभी फ़ाइलों को संसाधित करने के लिए प्रतीक्षा करना चाहता हूं और फिर संसाधित फ़ाइलों पर जारी रहना चाहता हूं।

+2

आप की कोशिश की [मल्टीप्रोसेस] (है http://docs.python.org/2/library/multiprocessing.html # मॉड्यूल-मल्टीप्रोसेसिंग) पूल? पायथन 3 पर आप [वायदा] (http://docs.python.org/dev/library/concurrent.futures.html) का भी उपयोग कर सकते हैं। – javex

+2

आप पाइथन 2 में भी ['वायदा'] (https://pypi.python.org/pypi/futures) का उपयोग कर सकते हैं, आपको बस बैकपोर्ट इंस्टॉल करने की आवश्यकता है। – abarnert

+0

concurrent.futures वास्तव में यह करने का सबसे अच्छा तरीका है – JBernardo

उत्तर

7

उदाहरण के लिए, मेरे पास कई फाइलों के साथ एक निर्देशिका है, और मैं उन सभी को संसाधित करना चाहता हूं, लेकिन केवल एक ही समय में समानांतर में।

यह वही है जो थ्रेड पूल करता है: आप नौकरियां बनाते हैं, और पूल समानांतर में एक समय में 4 चलाता है। आप निष्पादक का उपयोग करके चीजों को और भी सरल बना सकते हैं, जहां आप इसे केवल फ़ंक्शन (या अन्य कॉलबेल) सौंपते हैं और यह आपको परिणाम के लिए वायदा वापस भेज देता है। आप इसे स्वयं बना सकते हैं, लेकिन आपको यह नहीं करना है। *

stdlib का concurrent.futures मॉड्यूल ऐसा करने का सबसे आसान तरीका है। (पाइथन 3.1 और इससे पहले, backport देखें।) वास्तव में, one of the main examples आप जो करना चाहते हैं उसके बहुत करीब है। लेकिन यह अपने सटीक उपयोग के मामले के लिए अनुकूल है:

def process_all_files(d): 
    files = glob.glob(d + '/*') 
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 
     fs = [executor.submit(process_file, file) for file in files] 
     concurrent.futures.wait(fs) 

यदि आप कुछ वापस जाने के लिए process_file चाहता था, कि लगभग रूप में आसान है:

def process_all_files(d): 
    files = glob.glob(d + '/*') 
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 
     fs = [executor.submit(process_file, file) for file in files] 
     for f in concurrent.futures.as_completed(fs): 
      do_something(f.result()) 

और तुम भी अपवाद को संभालने के लिए ... चाहते हैं, तो ठीक है, बस देखो उदाहरण पर; result() पर कॉल के आस-पास यह सिर्फ try/except है।


* यदि आप उन्हें स्वयं बनाना चाहते हैं, तो यह मुश्किल नहीं है। multiprocessing.pool का स्रोत अच्छी तरह लिखा है और टिप्पणी की गई है, और जटिल नहीं है, और अधिकांश हार्ड सामान थ्रेडिंग के लिए प्रासंगिक नहीं है; concurrent.futures का स्रोत भी आसान है।

0

मैं इस तकनीक को कई बार इस्तेमाल किया है, मुझे लगता है कि यह थोड़ा बदसूरत सोचा है:

import threading 

def process_something(): 
    something = list(get_something) 

    def worker(): 
     while something: 
      obj = something.pop() 
      # do something with obj 

    threads = [Thread(target=worker) for i in range(4)] 
    [t.start() for t in threads] 
    [t.join() for t in threads] 
संबंधित मुद्दे

 संबंधित मुद्दे