recursive
जेनरेटर जो आप दिखाते हैं वह वास्तव में ऐसे तरीके से पुनरावर्ती नहीं है जो सिस्टम रिकर्सन सीमा के साथ समस्या का कारण बनता है।
यह समझने के लिए कि आपको recursive
जेनरेटर कोड चलाते समय ध्यान देने की आवश्यकता क्यों है। सामान्य फ़ंक्शन के विपरीत, बस recursive(0)
पर कॉल करने से यह तुरंत अपना कोड चलाने और अतिरिक्त रिकर्सिव कॉल करने का कारण नहीं बनता है। इसके बजाय, recursive(0)
पर कॉल करने से तुरंत जनरेटर ऑब्जेक्ट लौटाता है। केवल जब आप जनरेटर के लिए send()
कोड चलाते हैं, और केवल send()
के बाद दूसरी बार यह दूसरी कॉल को लात मारता है।
कोड कॉल के रूप में कॉल स्टैक की जांच करें। शीर्ष स्तर पर, हम Task(recursive(0)).step()
चलाते हैं। यही कारण है कि एक दृश्य में तीन बातें करता है:
recursive(0)
इस कॉल एक जनरेटर वस्तु तुरंत वापस आती है।
Task(_)
Task
ऑब्जेक्ट बनाया गया है, और इसकी __init__
विधि पहले चरण में बनाए गए जेनरेटर ऑब्जेक्ट का संदर्भ संग्रहीत करती है।
_.step()
कार्य पर एक विधि कहा जाता है। यह वह जगह है जहां कार्रवाई वास्तव में शुरू होती है! आइए देखें कि कॉल के अंदर क्या होता है:
fut = self._gen.send(value)
यहां हम वास्तव में जनरेटर को एक मूल्य भेजकर चलाना शुरू करते हैं। आइए गहराई से जाएं और जनरेटर कोड चलाएं:
yield pool.submit(time.sleep, 0.001)
यह शेड्यूल किसी अन्य थ्रेड में कुछ करने के लिए शेड्यूल करता है। हालांकि हम इसके होने की प्रतीक्षा नहीं करते हैं। इसके बजाय, हमें Future
मिलता है जिसे हम पूरा होने पर अधिसूचित होने के लिए उपयोग कर सकते हैं। हम तुरंत भविष्य के कोड के पिछले स्तर पर भविष्य का उत्पादन करते हैं।
fut.add_done_callback(self._wakeup)
यहां हम भविष्य में तैयार होने पर हमारे _wakeup()
विधि को कॉल करने के लिए कहते हैं। यह हमेशा तुरंत लौटता है!
step
विधि अब समाप्त होती है। यह सही है, हम कर रहे हैं (इस पल के लिए)! यह आपके प्रश्न के दूसरे भाग के लिए महत्वपूर्ण है, जिसे मैं बाद में चर्चा करूंगा।
हमने जो कॉल समाप्त कर दिया, तो अगर हम अंतःक्रियात्मक रूप से चल रहे हैं तो नियंत्रण प्रवाह आरईपीएल पर वापस आ जाता है। यदि हम एक स्क्रिप्ट के रूप में चल रहे हैं, तो इसके बजाय दुभाषिया स्क्रिप्ट के अंत तक पहुंच जाएगा और बंद करना शुरू कर देगा (मैं इस पर और अधिक चर्चा करूंगा)। हालांकि, थ्रेड पूल द्वारा नियंत्रित अन्य थ्रेड अभी भी चल रहे हैं, और किसी बिंदु पर, उनमें से एक कुछ चीजें करने जा रहा है जिनकी हम परवाह करते हैं! चलो देखते हैं कि क्या है।
जब निर्धारित फ़ंक्शन (time.sleep
) चलाना समाप्त हो गया है, तो जिस थ्रेड में चल रहा था वह उस कॉलबैक को कॉल करेगा जिसे हमने Future
ऑब्जेक्ट पर सेट किया था। यही है, यह पर Task
ऑब्जेक्ट पर कॉल करेगा जिसे हमने पहले बनाया था (जिसका हमारे पास शीर्ष स्तर पर अब कोई संदर्भ नहीं है, लेकिन Future
ने एक संदर्भ रखा है, इसलिए यह अभी भी जीवित है)।आइए विधि को देखें:
result = fut.result()
स्थगित कॉल के परिणाम को स्टोर करें। यह इस मामले में अप्रासंगिक है क्योंकि हम परिणामों को कभी नहीं देखते हैं (यह None
वैसे भी है)।
self.step(result)
फिर से कदम! अब हम उस कोड पर वापस आ गए हैं जिसकी हम परवाह करते हैं। चलिए देखते हैं कि यह इस समय क्या करता है:
fut = self._gen.send(value)
जनरेटर को फिर से भेजें, इसलिए यह खत्म हो जाता है। यह पहले से ही एक बार पैदा हुआ है, इसलिए इस बार हम yield
:
print("Tick :", n)
यह बहुत आसान है।
Task(recursive(n+1)).step()
यह वह जगह है जहां चीजें दिलचस्प होती हैं। यह लाइन वही है जो हमने शुरू की थी। तो, पहले की तरह, यह ऊपर सूचीबद्ध तर्क 1-4 चलाने के लिए जा रहा है (उनके substeps सहित)। लेकिन आरईपीएल पर लौटने या स्क्रिप्ट को समाप्त करने की बजाय, जब step()
विधि रिटर्न देता है, तो यह यहां वापस आता है।
recursive()
जेनरेटर (मूल एक, जिसे हमने अभी बनाया है नया नहीं) इसके अंत तक पहुंच गया है। तो, किसी भी जनरेटर की तरह जो इसके कोड के अंत तक पहुंचता है, यह StopIteration
बढ़ाता है।
StopIteration
पकड़ा है और try
/except
ब्लॉक द्वारा नजरअंदाज कर दिया, और step()
विधि समाप्त होता है।
_wakup()
विधि भी समाप्त होती है, इसलिए कॉलबैक किया जाता है।
- आखिरकार कॉलबैक को पहले कॉलबैक में बनाए गए
Task
के लिए कॉलबैक भी कहा जाएगा। तो हम वापस जाते हैं और चरण 5 को बार-बार दोहराते हैं, हमेशा के लिए (अगर हम अंतःक्रियात्मक रूप से चल रहे हैं)।
उपरोक्त कॉल स्टैक बताता है कि इंटरैक्टिव केस हमेशा के लिए प्रिंट क्यों करता है। मुख्य धागा आरईपीएल पर लौटता है (और यदि आप अन्य धागे से आउटपुट पिछले देख सकते हैं तो आप इसके साथ अन्य सामान भी कर सकते हैं)। लेकिन पूल में, प्रत्येक थ्रेड अपने काम की कॉलबैक से दूसरी नौकरी निर्धारित करता है। जब अगली नौकरी खत्म हो जाती है, तो इसका कॉलबैक शेड्यूल करता है और इसी तरह।
तो जब आप कोड को स्क्रिप्ट के रूप में चलाते हैं तो आपको केवल 8 प्रिंटआउट क्यों मिलते हैं? उत्तर उपरोक्त चरण 4 में संकेत दिया गया है। गैर-इंटरैक्टिव रूप से चलते समय, मुख्य थ्रेड Task.step
रिटर्न पर पहली कॉल के बाद स्क्रिप्ट के अंत से बाहर चला जाता है। यह दुभाषिया को बंद करने का प्रयास करने के लिए संकेत देता है।
concurrent.futures.thread
मॉड्यूल (जहां ThreadPoolExecutor
परिभाषित किया गया है) में कुछ फैंसी तर्क हैं जो निष्पादक अभी भी सक्रिय होने पर प्रोग्राम बंद हो जाने पर अच्छी तरह से साफ करने की कोशिश करता है। यह किसी भी निष्क्रिय धागे को रोकना है, और किसी भी सिग्नल को सिग्नल करना है जो अभी भी बंद होने के लिए चल रहा है जब उनका वर्तमान काम पूरा हो गया है।
उस सफाई तर्क का सटीक कार्यान्वयन हमारे कोड के साथ एक बहुत ही अजीब तरीके से बातचीत करता है (जो छोटी हो सकती है या नहीं)। प्रभाव यह है कि पहला धागा खुद को और अधिक काम करने के लिए रखता है, जबकि अतिरिक्त कार्यकर्ता थ्रेड जो पैदा होते हैं, वे तुरंत उत्पन्न होने के बाद बाहर निकलते रहते हैं। अंततः पहला कार्यकर्ता छोड़ देता है जब निष्पादक ने कई धागे शुरू किए हैं क्योंकि यह उपयोग करना चाहता था (हमारे मामले में 8)।
यहां घटनाओं का अनुक्रम है, जैसा कि मैं इसे समझता हूं।
- हम आयात (परोक्ष रूप से)
concurrent.futures.thread
मॉड्यूल है, जो atexit
का उपयोग करता है बस से पहले दुभाषिया बंद हो जाता है _python_exit
नाम के एक समारोह को चलाने के लिए दुभाषिया बताने के लिए।
- हम 8. की एक अधिकतम धागा गिनती यह अपने कार्यकर्ता धागे तुरंत अंडे नहीं करता है के साथ एक
ThreadPoolExecutor
बनाने के लिए, लेकिन एक नौकरी के लिए निर्धारित है एक हर बार पैदा करेगा जब तक यह सब 8.
- हमने अपना पहला काम अनुसूची है (पिछली सूची से चरण 3 के गहरे घोंसले भाग में)।
- निष्पादक अपनी आंतरिक कतार में नौकरी जोड़ता है, फिर नोटिस करता है कि इसमें अधिकतम कार्यकर्ता धागे नहीं हैं और एक नया शुरू होता है।
- नया धागा कतार से नौकरी को पॉप करता है और इसे चलाने शुरू करता है। हालांकि,
sleep
कॉल शेष चरणों की तुलना में काफी अधिक समय लेता है, इसलिए थ्रेड थोड़ी देर के लिए यहां फंस जाएगा।
- मुख्य धागा खत्म होता है (यह पिछली सूची में चरण 4 तक पहुंच गया है)।
_python_exit
फ़ंक्शन को दुभाषिया द्वारा बुलाया जाता है, क्योंकि दुभाषिया बंद करना चाहता है। फ़ंक्शन मॉड्यूल में वैश्विक _shutdown
चर सेट करता है, और निष्पादक की आंतरिक कतार में None
भेजता है (यह प्रति None
प्रति थ्रेड भेजता है, लेकिन अभी तक केवल एक धागा बनाया गया है, इसलिए यह केवल एक None
भेजता है)। यह तब तक मुख्य धागे को अवरुद्ध करता है जब तक कि वह थ्रेड को छोड़ने के बारे में जानता है। यह दुभाषिया बंद करने में देरी करता है।
- कार्यकर्ता थ्रेड का कॉल
time.sleep
पर लौटाता है। यह कॉलबैक फ़ंक्शन को कॉल करता है जो उसके नौकरी के Future
के साथ पंजीकृत है, जो एक और नौकरी निर्धारित करता है।
- इस सूची के चरण 4 में की तरह, निष्पादक नौकरी को कतार में रखता है, और एक और धागा शुरू करता है, क्योंकि इसमें अभी तक वांछित संख्या नहीं है।
- नया धागा आंतरिक कतार से नौकरी पकड़ने की कोशिश करता है, लेकिन चरण 7 से
None
मान प्राप्त करता है जो एक संकेत है कि यह किया जा सकता है। यह देखता है कि _shutdown
वैश्विक सेट है और इसलिए यह निकलता है। हालांकि यह करने से पहले, यह कतार में None
जोड़ता है।
- पहला कार्यकर्ता धागा अपनी कॉलबैक समाप्त करता है। यह एक नई नौकरी की तलाश में है, और उसे वह चरण 8 में स्वयं को कतार में पाया गया है। यह नौकरी चलाने शुरू होता है, और चरण 5 में, जिसमें कुछ समय लगता है।
- हालांकि कुछ भी नहीं होता है, क्योंकि पहले कार्यकर्ता इस समय एकमात्र सक्रिय धागा (मुख्य धागा पहले कार्यकर्ता पर मरने के लिए अवरुद्ध है, और दूसरा कार्यकर्ता खुद को बंद कर देता है)।
- अब हम कई बार चरण 8-12 दोहराते हैं। पहला कार्यकर्ता धागा 8 वें नौकरियों के माध्यम से तीसरे स्थान पर जाता है, और निष्पादक हर बार एक संबंधित धागे को जन्म देता है क्योंकि इसमें पूर्ण सेट नहीं होता है। हालांकि, प्रत्येक थ्रेड तुरंत मर जाता है, क्योंकि इसे पूरा करने के लिए वास्तविक नौकरी के बजाय नौकरी कतार से
None
मिलता है। पहला कार्यकर्ता धागा सभी वास्तविक काम करने के समाप्त होता है।
- आखिरकार, 8 वीं नौकरी के बाद, कुछ अलग-अलग काम करता है। इस बार, जब कॉलबैक किसी अन्य नौकरी को शेड्यूल करता है, तो कोई अतिरिक्त थ्रेड उत्पन्न नहीं होता है, क्योंकि निष्पादक जानता है कि उसने पहले से अनुरोध किए गए 8 थ्रेड शुरू कर दिए हैं (यह नहीं पता कि 7 बंद हो गए हैं)।
- इसलिए इस बार,
None
जो कि आंतरिक नौकरी कतार के प्रमुख पर है, पहले कार्यकर्ता (वास्तविक नौकरी के बजाय) द्वारा उठाया जाता है। इसका मतलब है कि यह अधिक काम करने के बजाय, बंद हो जाता है।
- जब पहला कार्यकर्ता बंद हो जाता है, मुख्य धागा (जो इसे छोड़ने का इंतजार कर रहा है) अंततः अनब्लॉक कर सकता है और
_python_exit
फ़ंक्शन पूर्ण हो जाता है। यह दुभाषिया को पूरी तरह बंद कर देता है। हो गया था!
यह हमारे द्वारा देखे जाने वाले आउटपुट को बताता है! हमें 8 आउटपुट मिलते हैं, सभी एक ही कार्यकर्ता थ्रेड (पहली बार पैदा हुए) से आते हैं।
मुझे लगता है कि उस कोड में दौड़ की स्थिति हो सकती है। यदि चरण 11 से पहले चरण 11 होता है तो चीजें तोड़ सकती हैं। अगर पहले कार्यकर्ता को कतार से None
मिल गया और अन्य नए पैदा हुए कार्यकर्ता को असली नौकरी मिल गई, तो स्वैप भूमिकाएं (पहला कार्यकर्ता मर जाएगा, और दूसरा बाकी काम करेगा, और अधिक दौड़ की स्थिति को छोड़कर उन चरणों के बाद के संस्करण)। हालांकि, जैसे ही पहले कार्यकर्ता की मृत्यु हो गई, मुख्य धागा को अनब्लॉक कर दिया जाएगा। चूंकि यह अन्य धागे के बारे में नहीं जानता है (चूंकि वे अस्तित्व में नहीं थे जब उन्होंने धागे की सूची को प्रतीक्षा करने के लिए बनाया), यह समय-समय पर दुभाषिया को बंद कर देगा।
मुझे यकीन नहीं है कि यह दौड़ होने की संभावना है या नहीं। मुझे लगता है कि यह बहुत ही असंभव है, क्योंकि नए धागे के बीच कोड पथ की लंबाई शुरू हो रही है और कतार से नौकरी पकड़ने से मौजूदा थ्रेड के लिए कॉलबैक खत्म करने के पथ से बहुत छोटा है (इसके बाद भाग नया काम) और फिर कतार में एक और नौकरी की तलाश करें।
मुझे संदेह है कि यह एक बग है कि ThreadPoolExecutor
हमें स्क्रिप्ट के रूप में हमारे कोड को चलाने पर साफ़ रूप से बाहर निकलने देता है। एक नई नौकरी छोड़ने के लिए तर्क शायद निष्पादक के self._shutdown
विशेषता के अलावा वैश्विक _shutdown
ध्वज की जांच करनी चाहिए। यदि ऐसा हुआ, तो मुख्य धागे समाप्त होने के बाद एक और नौकरी कतार लगाने की कोशिश अपवाद उठाएगी।
आप एक with
बयान में ThreadPoolExecutor
बनाकर क्या मैं saner व्यवहार किया जाएगा लगता है दोहराने कर सकते हैं:
# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:
Task(recursive(0)).step()
यह step()
कॉल से मुख्य थ्रेड रिटर्न के बाद जल्द ही दुर्घटना होगा। यह इस तरह कुछ दिखाई देगा:
exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
callback(self)
File ".\task_coroutines.py", line 21, in _wakeup
self.step(result)
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 30, in recursive
Task(recursive(n+1)).step()
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 28, in recursive
yield pool.submit(time.sleep, 1)
File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
मैंने अपने व्याख्यान को देखा है, यह अविश्वसनीय है कि यह आदमी 40 मिनट से कम में क्या करता है। मेरी धारणा है कि आपका प्रश्न उत्तर 'थ्रेडपूल एक्स्सेलर' में रहता है। यह एक नई प्रक्रिया बनाता है जो 'अगली()' कॉल को संभालता है, इसलिए प्रत्येक 'कार्य' को अपनी प्रक्रिया मिलती है। एक और बात यह है कि जनरेटर एक समय में एक मूल्य उत्पन्न करते हैं ताकि उन्हें वापस करने से पहले पूरे तत्वों को स्मृति में रखने की आवश्यकता न हो। यह एक मूल्य पैदा करता है, और निलंबित करता है। जब जाग गया, यह अगले मूल्य पैदा करता है और निलंबित करता है। एक समय में एक मूल्य। मुझे Q2 के लिए कोई धारणा नहीं है, लेकिन यह दिलचस्प है। – Vinny