2011-06-28 3 views
9

मैं concurrent.futures मॉड्यूल का उपयोग कर python3.2 में काम करने के लिए टाइमआउट प्राप्त करने का प्रयास कर रहा हूं। हालांकि जब यह टाइमआउट करता है, तो यह वास्तव में निष्पादन को रोक नहीं देता है। मैंने दोनों धागे और प्रक्रिया पूल निष्पादकों के साथ प्रयास किया, उनमें से कोई भी कार्य को रोक नहीं पाया, और केवल समाप्त होने तक ही समय समाप्त हो गया। तो क्या किसी को पता है कि यह काम करने के लिए संभव है?टाइमआउट के साथ concurrent.futures का उपयोग कैसे करें?

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

def run_loop(max_number): 
    print("Started:", datetime.datetime.now(), max_number) 
    last_number = 0; 
    for i in range(1, max_number + 1): 
     last_number = i * i 
    return last_number 

def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 

if __name__ == '__main__': 
    main() 

उत्तर

15

जहां तक ​​मैं कह सकता हूं, टाइमआउट एरर वास्तव में उठाया जाता है जब आप इसकी अपेक्षा करेंगे, और कार्य समाप्त होने के बाद नहीं।

हालांकि, आपका प्रोग्राम स्वयं चल रहा है जब तक कि सभी चल रहे कार्यों को पूरा नहीं किया जाता है। ऐसा इसलिए है क्योंकि वर्तमान में कार्य निष्पादित करना (आपके मामले में, शायद आपके सभी सबमिट किए गए कार्य, जैसे कि आपका पूल आकार कार्यों की संख्या के बराबर है), वास्तव में "मारे गए" नहीं हैं।

टाइमआउट त्रुटि बढ़ी है, ताकि आप कार्य पूरा होने तक प्रतीक्षा न करें (और इसके बजाय कुछ और करें), लेकिन कार्य पूरा होने तक चलते रहेंगे। और अजगर तब तक बाहर नहीं निकलेगा जब तक आपके निष्पादक के धागे/उपप्रोसेसेस में अधूरे कार्य नहीं होते हैं।

जहां तक ​​मुझे पता है, वर्तमान में फ़्यूचर्स निष्पादित करने के लिए "रोकें" संभव नहीं है, आप केवल प्रारंभ किए गए कार्यों को "रद्द" कर सकते हैं जिन्हें अभी तक शुरू नहीं किया जाना है। आपके मामले में, कोई भी नहीं होगा, लेकिन कल्पना करें कि आपके पास 5 धागे/प्रक्रियाओं का पूल है, और आप 100 आइटम संसाधित करना चाहते हैं। किसी बिंदु पर, 20 पूर्ण कार्य हो सकते हैं, 5 चल रहे कार्य, और 75 कार्य निर्धारित हो सकते हैं। इस मामले में, आप उन 76 निर्धारित कार्यों को रद्द करने में सक्षम होंगे, लेकिन जो चल रहे हैं, पूरा होने तक जारी रहेगा, चाहे आप परिणाम की प्रतीक्षा करें या नहीं।

हालांकि यह इस तरह से नहीं किया जा सकता है, मुझे लगता है कि आपके वांछित अंतिम परिणाम प्राप्त करने के तरीके होने चाहिए। हो सकता है कि इस संस्करण कार्य में सहायता कर सकते हैं (यकीन नहीं करता है, तो यह होता है कि आप वास्तव में क्या करना चाहता था, लेकिन यह कुछ काम का हो सकता):

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

class Task: 
    def __init__(self, max_number): 
     self.max_number = max_number 
     self.interrupt_requested = False 

    def __call__(self): 
     print("Started:", datetime.datetime.now(), self.max_number) 
     last_number = 0; 
     for i in xrange(1, self.max_number + 1): 
      if self.interrupt_requested: 
       print("Interrupted at", i) 
       break 
      last_number = i * i 
     print("Reached the end") 
     return last_number 

    def interrupt(self): 
     self.interrupt_requested = True 

def main(): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor: 
     tasks = [Task(num) for num in max_numbers] 
     for task, future in [(i, executor.submit(i)) for i in tasks]: 
      try: 
       print(future.result(timeout=1)) 
      except concurrent.futures.TimeoutError: 
       print("this took too long...") 
       task.interrupt() 


if __name__ == '__main__': 
    main() 

प्रत्येक "काम" के लिए एक प्रतिदेय वस्तु बनाने, और उन देकर केवल एक सादे फ़ंक्शन के बजाय निष्पादक को, आप कार्य को "बाधित" करने का एक तरीका प्रदान कर सकते हैं। युक्ति: task.interrupt() लाइन को हटाने और देखो क्या होता है, यह आसान ऊपर ;-)

4

मेरी लंबा स्पष्टीकरण को समझने के लिए हाल ही में मैं भी इस मुद्दे को हिट कर सकते हैं, और अंत में मैं निम्नलिखित समाधान ProcessPoolExecutor का उपयोग कर के साथ आते हैं:


def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 
      stop_process_pool(executor) 

def stop_process_pool(executor): 
    for pid, processes in executor._processes.items(): 
     process.terminate() 
    executor.shutdown() 
+0

txmc तुम सिर्फ प्रक्रियाओं में से एक को मार सकता है? या क्या आपको सभी को मारना है? –

संबंधित मुद्दे