2017-05-31 42 views
6

के बाद असफल होने के रूप में चिह्नित सबफैग में लंबे समय तक चलने वाला कार्य मेरे पास एक लंबे समय से चलने वाले चरण (आमतौर पर लगभग 2 घंटे के साथ एयरफ्लो में एक सबडैग है, हालांकि यह किस इकाई पर चल रहा है) के आधार पर भिन्न होता है। 1.7.1.3 के तहत, यह चरण लगातार AIRFLOW-736 का कारण बनता है और उप-चरण 'चलने' स्थिति में बंद हो जाएगा जब सभी चरणों में सफल रहे। हम इस बारे में काम कर सकते थे क्योंकि डेटाबेस में मैन्युअल रूप से SubDagOperator को मैन्युअल रूप से चिह्नित करने (बजाए जाने के बजाए) को उप-डीएजी के बाद हमारे पास कदम नहीं थे।एयरफ्लो - एक घंटे

हम निम्न कार्य करके अब वायु प्रवाह 1.8.1 परीक्षण कर रहे हैं, उन्नयन:

  1. हमारे अनुसूचक और श्रमिकों
  2. पिप के माध्यम से नीचे shuting, वायु प्रवाह की स्थापना रद्द करने और अपाचे-प्रवाह को स्थापित करने से (संस्करण 1.8.1)
  3. हवा का प्रवाह upgradedb
  4. हवा का प्रवाह अनुसूचक और श्रमिकों

चल रहा है प्रणाली ओ के साथ Runing थर्मव अनछुए, वही डीएजी अब लंबे समय तक चलने वाले कार्य के बाद 100% समय में असफल रहा है, हालांकि 1 घंटे के निशान (हालांकि विचित्र रूप से, बिल्कुल 3600 सेकेंड बाद नहीं - यह घंटे की टिकों के बाद 30 से 9 0 सेकेंड तक कहीं भी हो सकता है) संदेश के साथ "निष्पादक रिपोर्ट कार्य उदाहरण समाप्त हुआ (असफल) हालांकि कार्य कहता है कि यह चल रहा है। क्या कार्य बाहरी रूप से मारा गया था? "हालांकि, कार्य स्वयं कार्यकर्ता पर चल रहा है। किसी भी तरह, कार्य को विफल करने में गलती वाले शेड्यूलर के बीच असहमति है (वास्तविक कार्य के बावजूद डेटाबेस पर आधारित jobs.py के this line देखें) ठीक चल रहा है।

मैंने पुष्टि की है कि, किसी भी तरह, राज्य एयरफ्लो डेटाबेस की task_instance तालिका में 'विफल' है। इस प्रकार, मैं जानना चाहता हूं कि कार्य स्थिति विफल होने पर कार्य स्थिति को कैसे सेट किया जा सकता है खुद अभी भी चल रहा है

यहां नमूने डेग जो मुद्दे से चलाता है:।

from datetime import datetime 
from airflow.models import DAG 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.subdag_operator import SubDagOperator 

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)} 

def define_sub(dag, step_name, sleeptime): 
    op = BashOperator(
     task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag 
    ) 
    return dag 

def gen_sub_dag(parent_name, step_name, sleeptime): 
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS) 
    define_sub(sub, step_name, sleeptime) 
    return sub 

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None) 

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent 
) 
+0

आज, मैं एक ही समस्या में भाग गया, एक लंबे समय तक चलने वाले कार्य के साथ एक सबडाग, एक घंटे से थोड़ा अधिक समय के बाद, मुझे त्रुटि संदेश मिला। दिलचस्प बात यह है कि शेड्यूलर ने कार्य को पुनरारंभ करने का प्रयास किया, जो एयरफ्लो के अवरुद्ध संसाधन के बाहर विफल रहा। कार्य समाप्त होने से पहले, मूल कार्य चलाना जारी रहा, और सही ढंग से समाप्त हो गया, एयरफ्लो ने असफल को चिह्नित किया। –

+0

आप किस निष्पादक का उपयोग कर रहे हैं। क्या यह अजवाइन + रेडिस है? –

उत्तर

0

यदि आप वास्तव में सेलेरी और रेडिस के साथ चल रहे हैं तो सेलेरी के लिए visibility timeout setting पर एक नज़र डालें और इसे अपने कार्य के अपेक्षित अंत समय से आगे बढ़ाएं।

हालांकि हम सेलेरी को कार्यों-एके-देर से कॉन्फ़िगर करते हैं, फिर भी इसमें कार्य गायब होने के साथ समस्याएं हैं। हम सेलरी में a bug पर विचार करते हैं।

संबंधित मुद्दे