के बाद असफल होने के रूप में चिह्नित सबफैग में लंबे समय तक चलने वाला कार्य मेरे पास एक लंबे समय से चलने वाले चरण (आमतौर पर लगभग 2 घंटे के साथ एयरफ्लो में एक सबडैग है, हालांकि यह किस इकाई पर चल रहा है) के आधार पर भिन्न होता है। 1.7.1.3 के तहत, यह चरण लगातार AIRFLOW-736 का कारण बनता है और उप-चरण 'चलने' स्थिति में बंद हो जाएगा जब सभी चरणों में सफल रहे। हम इस बारे में काम कर सकते थे क्योंकि डेटाबेस में मैन्युअल रूप से SubDagOperator को मैन्युअल रूप से चिह्नित करने (बजाए जाने के बजाए) को उप-डीएजी के बाद हमारे पास कदम नहीं थे।एयरफ्लो - एक घंटे
हम निम्न कार्य करके अब वायु प्रवाह 1.8.1 परीक्षण कर रहे हैं, उन्नयन:
- हमारे अनुसूचक और श्रमिकों
- पिप के माध्यम से नीचे shuting, वायु प्रवाह की स्थापना रद्द करने और अपाचे-प्रवाह को स्थापित करने से (संस्करण 1.8.1)
- हवा का प्रवाह upgradedb
- हवा का प्रवाह अनुसूचक और श्रमिकों
चल रहा है प्रणाली ओ के साथ Runing थर्मव अनछुए, वही डीएजी अब लंबे समय तक चलने वाले कार्य के बाद 100% समय में असफल रहा है, हालांकि 1 घंटे के निशान (हालांकि विचित्र रूप से, बिल्कुल 3600 सेकेंड बाद नहीं - यह घंटे की टिकों के बाद 30 से 9 0 सेकेंड तक कहीं भी हो सकता है) संदेश के साथ "निष्पादक रिपोर्ट कार्य उदाहरण समाप्त हुआ (असफल) हालांकि कार्य कहता है कि यह चल रहा है। क्या कार्य बाहरी रूप से मारा गया था? "हालांकि, कार्य स्वयं कार्यकर्ता पर चल रहा है। किसी भी तरह, कार्य को विफल करने में गलती वाले शेड्यूलर के बीच असहमति है (वास्तविक कार्य के बावजूद डेटाबेस पर आधारित jobs.py के this line देखें) ठीक चल रहा है।
मैंने पुष्टि की है कि, किसी भी तरह, राज्य एयरफ्लो डेटाबेस की task_instance तालिका में 'विफल' है। इस प्रकार, मैं जानना चाहता हूं कि कार्य स्थिति विफल होने पर कार्य स्थिति को कैसे सेट किया जा सकता है खुद अभी भी चल रहा है
यहां नमूने डेग जो मुद्दे से चलाता है:।
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
def define_sub(dag, step_name, sleeptime):
op = BashOperator(
task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
)
return dag
def gen_sub_dag(parent_name, step_name, sleeptime):
sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
define_sub(sub, step_name, sleeptime)
return sub
long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)
long_sub_dag = SubDagOperator(
subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
आज, मैं एक ही समस्या में भाग गया, एक लंबे समय तक चलने वाले कार्य के साथ एक सबडाग, एक घंटे से थोड़ा अधिक समय के बाद, मुझे त्रुटि संदेश मिला। दिलचस्प बात यह है कि शेड्यूलर ने कार्य को पुनरारंभ करने का प्रयास किया, जो एयरफ्लो के अवरुद्ध संसाधन के बाहर विफल रहा। कार्य समाप्त होने से पहले, मूल कार्य चलाना जारी रहा, और सही ढंग से समाप्त हो गया, एयरफ्लो ने असफल को चिह्नित किया। –
आप किस निष्पादक का उपयोग कर रहे हैं। क्या यह अजवाइन + रेडिस है? –