2017-10-02 40 views
9

निम्नलिखित जेनरेटर (here रुचि रखने वालों के लिए) पर डेविड बीज़ली की स्लाइड से लिया गया है।जनरेटर आधारित कोरआउट के साथ गंभीर रूप से अनंत रिकर्सन

एक Task वर्ग परिभाषित किया गया है जो एक जनरेटर है कि वायदा पैदावार, Task वर्ग, में लपेटता पूर्ण (w/ओ त्रुटि हैंडलिंग), इस प्रकार है:

class Task: 
    def __init__(self, gen): 
     self._gen = gen 

    def step(self, value=None): 
     try: 
      fut = self._gen.send(value) 
      fut.add_done_callback(self._wakeup) 
     except StopIteration as exc: 
      pass 

    def _wakeup(self, fut): 
     result = fut.result() 
     self.step(result) 

एक उदाहरण में, निम्न पुनरावर्ती क्रिया भी है परिभाषित:

from concurrent.futures import ThreadPoolExecutor 
import time 

pool = ThreadPoolExecutor(max_workers=8) 

def recursive(n): 
    yield pool.submit(time.sleep, 0.001) 
    print("Tick :", n) 
    Task(recursive(n+1)).step() 

निम्नलिखित दो मामलों बाहर खेलने:

  1. अजगर आरईपीएल से, अगर हम इन परिभाषित करते हैं (या उन्हें आयात करता है, तो हम उन्हें एक फ़ाइल में जगह) और फिर साथ प्रत्यावर्तन कूद-शुरू:

    Task(recursive(0)).step() 
    

    यह जहां प्रत्यावर्तन एक बिंदु के लिए प्रतीयमान दूर मुद्रण शुरू होता है सीमा पार हो गई होगी। यह स्पष्ट रूप से इससे अधिक नहीं है, हालांकि, स्टैक स्तर को प्रिंट करने से पता चलता है कि यह निरंतर निष्पादन में स्थिर रहता है। कुछ और चल रहा है जिस पर मैं काफी समझ नहीं पा रहा हूं।

    नोट: यदि आप इसे इस तरह निष्पादित करते हैं तो आपको अजगर प्रक्रिया को मारने की आवश्यकता होगी।

  2. अगर हम साथ साथ एक फ़ाइल में सभी सामग्री (Task, recursive) डाल:

    if __name__ == "__main__": 
        Task(recursive(0)).step() 
    

    और फिर python myfile.py के साथ चलाने के लिए, यह 7 पर टिक टिक बंद हो जाता है (max_workers की संख्या, ऐसा लगता है) ।


मेरे सवाल यह है कि यह प्रतीत होता है प्रत्यावर्तन सीमा को पार करता है और क्यों इसे दूसरे तरीके से आप इसे कैसे अमल आधार पर कैसे कार्य करता है?

व्यवहार पाइथन 3.6.2 और पायथन 3.5.4 दोनों पर दिखाई देता है (और मैं दूसरों को 3.6 और 3.5 परिवार में भी अनुमान लगाता हूं)।

+0

मैंने अपने व्याख्यान को देखा है, यह अविश्वसनीय है कि यह आदमी 40 मिनट से कम में क्या करता है। मेरी धारणा है कि आपका प्रश्न उत्तर 'थ्रेडपूल एक्स्सेलर' में रहता है। यह एक नई प्रक्रिया बनाता है जो 'अगली()' कॉल को संभालता है, इसलिए प्रत्येक 'कार्य' को अपनी प्रक्रिया मिलती है। एक और बात यह है कि जनरेटर एक समय में एक मूल्य उत्पन्न करते हैं ताकि उन्हें वापस करने से पहले पूरे तत्वों को स्मृति में रखने की आवश्यकता न हो। यह एक मूल्य पैदा करता है, और निलंबित करता है। जब जाग गया, यह अगले मूल्य पैदा करता है और निलंबित करता है। एक समय में एक मूल्य। मुझे Q2 के लिए कोई धारणा नहीं है, लेकिन यह दिलचस्प है। – Vinny

उत्तर

10

recursive जेनरेटर जो आप दिखाते हैं वह वास्तव में ऐसे तरीके से पुनरावर्ती नहीं है जो सिस्टम रिकर्सन सीमा के साथ समस्या का कारण बनता है।

यह समझने के लिए कि आपको recursive जेनरेटर कोड चलाते समय ध्यान देने की आवश्यकता क्यों है। सामान्य फ़ंक्शन के विपरीत, बस recursive(0) पर कॉल करने से यह तुरंत अपना कोड चलाने और अतिरिक्त रिकर्सिव कॉल करने का कारण नहीं बनता है। इसके बजाय, recursive(0) पर कॉल करने से तुरंत जनरेटर ऑब्जेक्ट लौटाता है। केवल जब आप जनरेटर के लिए send() कोड चलाते हैं, और केवल send() के बाद दूसरी बार यह दूसरी कॉल को लात मारता है।

कोड कॉल के रूप में कॉल स्टैक की जांच करें। शीर्ष स्तर पर, हम Task(recursive(0)).step() चलाते हैं। यही कारण है कि एक दृश्य में तीन बातें करता है:

  1. recursive(0) इस कॉल एक जनरेटर वस्तु तुरंत वापस आती है।
  2. Task(_)Task ऑब्जेक्ट बनाया गया है, और इसकी __init__ विधि पहले चरण में बनाए गए जेनरेटर ऑब्जेक्ट का संदर्भ संग्रहीत करती है।
  3. _.step() कार्य पर एक विधि कहा जाता है। यह वह जगह है जहां कार्रवाई वास्तव में शुरू होती है! आइए देखें कि कॉल के अंदर क्या होता है:

    • fut = self._gen.send(value) यहां हम वास्तव में जनरेटर को एक मूल्य भेजकर चलाना शुरू करते हैं। आइए गहराई से जाएं और जनरेटर कोड चलाएं:
      • yield pool.submit(time.sleep, 0.001) यह शेड्यूल किसी अन्य थ्रेड में कुछ करने के लिए शेड्यूल करता है। हालांकि हम इसके होने की प्रतीक्षा नहीं करते हैं। इसके बजाय, हमें Future मिलता है जिसे हम पूरा होने पर अधिसूचित होने के लिए उपयोग कर सकते हैं। हम तुरंत भविष्य के कोड के पिछले स्तर पर भविष्य का उत्पादन करते हैं।
    • fut.add_done_callback(self._wakeup) यहां हम भविष्य में तैयार होने पर हमारे _wakeup() विधि को कॉल करने के लिए कहते हैं। यह हमेशा तुरंत लौटता है!
    • step विधि अब समाप्त होती है। यह सही है, हम कर रहे हैं (इस पल के लिए)! यह आपके प्रश्न के दूसरे भाग के लिए महत्वपूर्ण है, जिसे मैं बाद में चर्चा करूंगा।
  4. हमने जो कॉल समाप्त कर दिया, तो अगर हम अंतःक्रियात्मक रूप से चल रहे हैं तो नियंत्रण प्रवाह आरईपीएल पर वापस आ जाता है। यदि हम एक स्क्रिप्ट के रूप में चल रहे हैं, तो इसके बजाय दुभाषिया स्क्रिप्ट के अंत तक पहुंच जाएगा और बंद करना शुरू कर देगा (मैं इस पर और अधिक चर्चा करूंगा)। हालांकि, थ्रेड पूल द्वारा नियंत्रित अन्य थ्रेड अभी भी चल रहे हैं, और किसी बिंदु पर, उनमें से एक कुछ चीजें करने जा रहा है जिनकी हम परवाह करते हैं! चलो देखते हैं कि क्या है।

  5. जब निर्धारित फ़ंक्शन (time.sleep) चलाना समाप्त हो गया है, तो जिस थ्रेड में चल रहा था वह उस कॉलबैक को कॉल करेगा जिसे हमने Future ऑब्जेक्ट पर सेट किया था। यही है, यह पर Task ऑब्जेक्ट पर कॉल करेगा जिसे हमने पहले बनाया था (जिसका हमारे पास शीर्ष स्तर पर अब कोई संदर्भ नहीं है, लेकिन Future ने एक संदर्भ रखा है, इसलिए यह अभी भी जीवित है)।आइए विधि को देखें:

    • result = fut.result() स्थगित कॉल के परिणाम को स्टोर करें। यह इस मामले में अप्रासंगिक है क्योंकि हम परिणामों को कभी नहीं देखते हैं (यह None वैसे भी है)।
    • self.step(result) फिर से कदम! अब हम उस कोड पर वापस आ गए हैं जिसकी हम परवाह करते हैं। चलिए देखते हैं कि यह इस समय क्या करता है:
      • fut = self._gen.send(value) जनरेटर को फिर से भेजें, इसलिए यह खत्म हो जाता है। यह पहले से ही एक बार पैदा हुआ है, इसलिए इस बार हम yield:
        • print("Tick :", n) यह बहुत आसान है।
        • Task(recursive(n+1)).step() यह वह जगह है जहां चीजें दिलचस्प होती हैं। यह लाइन वही है जो हमने शुरू की थी। तो, पहले की तरह, यह ऊपर सूचीबद्ध तर्क 1-4 चलाने के लिए जा रहा है (उनके substeps सहित)। लेकिन आरईपीएल पर लौटने या स्क्रिप्ट को समाप्त करने की बजाय, जब step() विधि रिटर्न देता है, तो यह यहां वापस आता है।
        • recursive() जेनरेटर (मूल एक, जिसे हमने अभी बनाया है नया नहीं) इसके अंत तक पहुंच गया है। तो, किसी भी जनरेटर की तरह जो इसके कोड के अंत तक पहुंचता है, यह StopIteration बढ़ाता है।
      • StopIteration पकड़ा है और try/except ब्लॉक द्वारा नजरअंदाज कर दिया, और step() विधि समाप्त होता है।
    • _wakup() विधि भी समाप्त होती है, इसलिए कॉलबैक किया जाता है।
  6. आखिरकार कॉलबैक को पहले कॉलबैक में बनाए गए Task के लिए कॉलबैक भी कहा जाएगा। तो हम वापस जाते हैं और चरण 5 को बार-बार दोहराते हैं, हमेशा के लिए (अगर हम अंतःक्रियात्मक रूप से चल रहे हैं)।

उपरोक्त कॉल स्टैक बताता है कि इंटरैक्टिव केस हमेशा के लिए प्रिंट क्यों करता है। मुख्य धागा आरईपीएल पर लौटता है (और यदि आप अन्य धागे से आउटपुट पिछले देख सकते हैं तो आप इसके साथ अन्य सामान भी कर सकते हैं)। लेकिन पूल में, प्रत्येक थ्रेड अपने काम की कॉलबैक से दूसरी नौकरी निर्धारित करता है। जब अगली नौकरी खत्म हो जाती है, तो इसका कॉलबैक शेड्यूल करता है और इसी तरह।

तो जब आप कोड को स्क्रिप्ट के रूप में चलाते हैं तो आपको केवल 8 प्रिंटआउट क्यों मिलते हैं? उत्तर उपरोक्त चरण 4 में संकेत दिया गया है। गैर-इंटरैक्टिव रूप से चलते समय, मुख्य थ्रेड Task.step रिटर्न पर पहली कॉल के बाद स्क्रिप्ट के अंत से बाहर चला जाता है। यह दुभाषिया को बंद करने का प्रयास करने के लिए संकेत देता है।

concurrent.futures.thread मॉड्यूल (जहां ThreadPoolExecutor परिभाषित किया गया है) में कुछ फैंसी तर्क हैं जो निष्पादक अभी भी सक्रिय होने पर प्रोग्राम बंद हो जाने पर अच्छी तरह से साफ करने की कोशिश करता है। यह किसी भी निष्क्रिय धागे को रोकना है, और किसी भी सिग्नल को सिग्नल करना है जो अभी भी बंद होने के लिए चल रहा है जब उनका वर्तमान काम पूरा हो गया है।

उस सफाई तर्क का सटीक कार्यान्वयन हमारे कोड के साथ एक बहुत ही अजीब तरीके से बातचीत करता है (जो छोटी हो सकती है या नहीं)। प्रभाव यह है कि पहला धागा खुद को और अधिक काम करने के लिए रखता है, जबकि अतिरिक्त कार्यकर्ता थ्रेड जो पैदा होते हैं, वे तुरंत उत्पन्न होने के बाद बाहर निकलते रहते हैं। अंततः पहला कार्यकर्ता छोड़ देता है जब निष्पादक ने कई धागे शुरू किए हैं क्योंकि यह उपयोग करना चाहता था (हमारे मामले में 8)।

यहां घटनाओं का अनुक्रम है, जैसा कि मैं इसे समझता हूं।

  1. हम आयात (परोक्ष रूप से) concurrent.futures.thread मॉड्यूल है, जो atexit का उपयोग करता है बस से पहले दुभाषिया बंद हो जाता है _python_exit नाम के एक समारोह को चलाने के लिए दुभाषिया बताने के लिए।
  2. हम 8. की एक अधिकतम धागा गिनती यह अपने कार्यकर्ता धागे तुरंत अंडे नहीं करता है के साथ एक ThreadPoolExecutor बनाने के लिए, लेकिन एक नौकरी के लिए निर्धारित है एक हर बार पैदा करेगा जब तक यह सब 8.
  3. हमने अपना पहला काम अनुसूची है (पिछली सूची से चरण 3 के गहरे घोंसले भाग में)।
  4. निष्पादक अपनी आंतरिक कतार में नौकरी जोड़ता है, फिर नोटिस करता है कि इसमें अधिकतम कार्यकर्ता धागे नहीं हैं और एक नया शुरू होता है।
  5. नया धागा कतार से नौकरी को पॉप करता है और इसे चलाने शुरू करता है। हालांकि, sleep कॉल शेष चरणों की तुलना में काफी अधिक समय लेता है, इसलिए थ्रेड थोड़ी देर के लिए यहां फंस जाएगा।
  6. मुख्य धागा खत्म होता है (यह पिछली सूची में चरण 4 तक पहुंच गया है)।
  7. _python_exit फ़ंक्शन को दुभाषिया द्वारा बुलाया जाता है, क्योंकि दुभाषिया बंद करना चाहता है। फ़ंक्शन मॉड्यूल में वैश्विक _shutdown चर सेट करता है, और निष्पादक की आंतरिक कतार में None भेजता है (यह प्रति None प्रति थ्रेड भेजता है, लेकिन अभी तक केवल एक धागा बनाया गया है, इसलिए यह केवल एक None भेजता है)। यह तब तक मुख्य धागे को अवरुद्ध करता है जब तक कि वह थ्रेड को छोड़ने के बारे में जानता है। यह दुभाषिया बंद करने में देरी करता है।
  8. कार्यकर्ता थ्रेड का कॉल time.sleep पर लौटाता है। यह कॉलबैक फ़ंक्शन को कॉल करता है जो उसके नौकरी के Future के साथ पंजीकृत है, जो एक और नौकरी निर्धारित करता है।
  9. इस सूची के चरण 4 में की तरह, निष्पादक नौकरी को कतार में रखता है, और एक और धागा शुरू करता है, क्योंकि इसमें अभी तक वांछित संख्या नहीं है।
  10. नया धागा आंतरिक कतार से नौकरी पकड़ने की कोशिश करता है, लेकिन चरण 7 से None मान प्राप्त करता है जो एक संकेत है कि यह किया जा सकता है। यह देखता है कि _shutdown वैश्विक सेट है और इसलिए यह निकलता है। हालांकि यह करने से पहले, यह कतार में None जोड़ता है।
  11. पहला कार्यकर्ता धागा अपनी कॉलबैक समाप्त करता है। यह एक नई नौकरी की तलाश में है, और उसे वह चरण 8 में स्वयं को कतार में पाया गया है। यह नौकरी चलाने शुरू होता है, और चरण 5 में, जिसमें कुछ समय लगता है।
  12. हालांकि कुछ भी नहीं होता है, क्योंकि पहले कार्यकर्ता इस समय एकमात्र सक्रिय धागा (मुख्य धागा पहले कार्यकर्ता पर मरने के लिए अवरुद्ध है, और दूसरा कार्यकर्ता खुद को बंद कर देता है)।
  13. अब हम कई बार चरण 8-12 दोहराते हैं। पहला कार्यकर्ता धागा 8 वें नौकरियों के माध्यम से तीसरे स्थान पर जाता है, और निष्पादक हर बार एक संबंधित धागे को जन्म देता है क्योंकि इसमें पूर्ण सेट नहीं होता है। हालांकि, प्रत्येक थ्रेड तुरंत मर जाता है, क्योंकि इसे पूरा करने के लिए वास्तविक नौकरी के बजाय नौकरी कतार से None मिलता है। पहला कार्यकर्ता धागा सभी वास्तविक काम करने के समाप्त होता है।
  14. आखिरकार, 8 वीं नौकरी के बाद, कुछ अलग-अलग काम करता है। इस बार, जब कॉलबैक किसी अन्य नौकरी को शेड्यूल करता है, तो कोई अतिरिक्त थ्रेड उत्पन्न नहीं होता है, क्योंकि निष्पादक जानता है कि उसने पहले से अनुरोध किए गए 8 थ्रेड शुरू कर दिए हैं (यह नहीं पता कि 7 बंद हो गए हैं)।
  15. इसलिए इस बार, None जो कि आंतरिक नौकरी कतार के प्रमुख पर है, पहले कार्यकर्ता (वास्तविक नौकरी के बजाय) द्वारा उठाया जाता है। इसका मतलब है कि यह अधिक काम करने के बजाय, बंद हो जाता है।
  16. जब पहला कार्यकर्ता बंद हो जाता है, मुख्य धागा (जो इसे छोड़ने का इंतजार कर रहा है) अंततः अनब्लॉक कर सकता है और _python_exit फ़ंक्शन पूर्ण हो जाता है। यह दुभाषिया को पूरी तरह बंद कर देता है। हो गया था!

यह हमारे द्वारा देखे जाने वाले आउटपुट को बताता है! हमें 8 आउटपुट मिलते हैं, सभी एक ही कार्यकर्ता थ्रेड (पहली बार पैदा हुए) से आते हैं।

मुझे लगता है कि उस कोड में दौड़ की स्थिति हो सकती है। यदि चरण 11 से पहले चरण 11 होता है तो चीजें तोड़ सकती हैं। अगर पहले कार्यकर्ता को कतार से None मिल गया और अन्य नए पैदा हुए कार्यकर्ता को असली नौकरी मिल गई, तो स्वैप भूमिकाएं (पहला कार्यकर्ता मर जाएगा, और दूसरा बाकी काम करेगा, और अधिक दौड़ की स्थिति को छोड़कर उन चरणों के बाद के संस्करण)। हालांकि, जैसे ही पहले कार्यकर्ता की मृत्यु हो गई, मुख्य धागा को अनब्लॉक कर दिया जाएगा। चूंकि यह अन्य धागे के बारे में नहीं जानता है (चूंकि वे अस्तित्व में नहीं थे जब उन्होंने धागे की सूची को प्रतीक्षा करने के लिए बनाया), यह समय-समय पर दुभाषिया को बंद कर देगा।

मुझे यकीन नहीं है कि यह दौड़ होने की संभावना है या नहीं। मुझे लगता है कि यह बहुत ही असंभव है, क्योंकि नए धागे के बीच कोड पथ की लंबाई शुरू हो रही है और कतार से नौकरी पकड़ने से मौजूदा थ्रेड के लिए कॉलबैक खत्म करने के पथ से बहुत छोटा है (इसके बाद भाग नया काम) और फिर कतार में एक और नौकरी की तलाश करें।

मुझे संदेह है कि यह एक बग है कि ThreadPoolExecutor हमें स्क्रिप्ट के रूप में हमारे कोड को चलाने पर साफ़ रूप से बाहर निकलने देता है। एक नई नौकरी छोड़ने के लिए तर्क शायद निष्पादक के self._shutdown विशेषता के अलावा वैश्विक _shutdown ध्वज की जांच करनी चाहिए। यदि ऐसा हुआ, तो मुख्य धागे समाप्त होने के बाद एक और नौकरी कतार लगाने की कोशिश अपवाद उठाएगी।

आप एक with बयान में ThreadPoolExecutor बनाकर क्या मैं saner व्यवहार किया जाएगा लगता है दोहराने कर सकते हैं:

# create the pool below the definition of recursive() 
with ThreadPoolExecutor(max_workers=8) as pool: 
    Task(recursive(0)).step() 

यह step() कॉल से मुख्य थ्रेड रिटर्न के बाद जल्द ही दुर्घटना होगा। यह इस तरह कुछ दिखाई देगा:

exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType> 
Traceback (most recent call last): 
    File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks 
    callback(self) 
    File ".\task_coroutines.py", line 21, in _wakeup 
    self.step(result) 
    File ".\task_coroutines.py", line 14, in step 
    fut = self._gen.send(value) 
    File ".\task_coroutines.py", line 30, in recursive 
    Task(recursive(n+1)).step() 
    File ".\task_coroutines.py", line 14, in step 
    fut = self._gen.send(value) 
    File ".\task_coroutines.py", line 28, in recursive 
    yield pool.submit(time.sleep, 1) 
    File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit 
    raise RuntimeError('cannot schedule new futures after shutdown') 
RuntimeError: cannot schedule new futures after shutdown 
+0

महान और व्यापक, पूरी तरह से जवाब के लिए धन्यवाद! –

0

संख्या 7 के साथ शुरू करने दें। [0..7] से लेबल किए गए श्रमिकों की संख्या पहले से ही उल्लेख की गई है। कार्य वर्ग को फ़ंक्शन पहचानकर्ता के रूप में recursive पारित करने की आवश्यकता है।

Task(recursive).step(n) 

Task(recursive(n)).step() 

यह है, क्योंकि पुनरावर्ती समारोह pool पर्यावरण के अंदर के नाम से जाना है, जबकि वर्तमान मामले में recursive मुख्य थ्रेड अपने आप में मूल्यांकन किया जाता है की जरूरत के बजाय

time.sleep वर्तमान कोड में एकमात्र फ़ंक्शन है जिसका मूल्यांकन कार्य पूल में किया जाता है।

एक प्रमुख पहलू जहां कोड का मुख्य मुद्दा रिकर्सन है। पूल में प्रत्येक धागा उपलब्ध श्रमिकों की संख्या के निष्पादन पर ऊपरी सीमा डालने वाले आंतरिक कार्य पर निर्भर है। फ़ंक्शन पूरा करने में सक्षम नहीं है इसलिए नया कोई निष्पादित नहीं कर सकता है। इस प्रकार, रिकर्सन सीमा तक पहुंचने से पहले यह बहुत समाप्त हो जाती है।

+0

उत्तर के लिए धन्यवाद लेकिन, नहीं, जनरेटर-आधारित कोरआउट को इस तरह पारित नहीं किया जाना चाहिए, इसे बुलाया जाना चाहिए ताकि हम वास्तव में जेनरेटर वापस प्राप्त कर सकें और इसे कोरआउटिन के रूप में उपयोग कर सकें (इसमें '.send'ing')। इसके अलावा, यह जवाब इस बात पर छूता नहीं है कि क्यों आमंत्रण के आधार पर व्यवहार अलग-अलग होता है। –

संबंधित मुद्दे