2017-12-24 157 views
5

एयरफ्लो में, मुझे इस मुद्दे का सामना करना पड़ रहा है कि मुझे job_flow_id को मेरे एमआर-चरणों में से एक को पास करने की आवश्यकता है। मैं ऑपरेटर से job_flow_id पुनर्प्राप्त करने में सक्षम हूं लेकिन जब मैं क्लस्टर को सबमिट करने के लिए कदम बनाने जा रहा हूं, तो task_instance मान सही नहीं है।एयरफ्लो - ईएमआर ऑपरेटर में कार्य उदाहरण

def issue_step(name, args): 
    return [ 
     { 
      "Name": name, 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": { 
       "Jar": "s3://....", 
       "Args": args 
      } 
     } 
    ] 

dag = DAG('example', 
      description='My dag', 
      schedule_interval='0 8 * * 6', 
      dagrun_timeout=timedelta(days=2)) 

try: 

    create_emr = EmrCreateJobFlowOperator(
     task_id='create_job_flow', 
     aws_conn_id='aws_default',   
     dag=dag 
    ) 

    load_data_steps = issue_step('load', ['arg1', 'arg2']) 

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') 
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
     "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id 

    load_data = EmrAddStepsOperator(
     task_id='load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others 
     aws_conn_id='aws_default', 
     steps=load_data_steps, 
     dag=dag 
    ) 

    check_load_data = EmrStepSensor(
     task_id='watch_load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    cluster_remover = EmrTerminateJobFlowOperator(
     task_id='remove_cluster', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    create_emr_recommendations >> load_data 
    load_data >> check_load_data 
    check_load_data >> cluster_remover 

except AirflowException as ae: 
    print ae.message 

समस्या यह है कि, जब मैं ईएमआर जाँच, बजाय load_data चरण में --cluster-id j-1234 को देखने का, मैं --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}" देखना है, जो मेरे कदम विफल कारण बनता है: मैं निम्नलिखित कोड है।

मैं अपने चरण फ़ंक्शन के अंदर वास्तविक मूल्य कैसे प्राप्त कर सकता हूं?

धन्यवाद और खुश छुट्टियों

+0

क्या आपने उद्धरण के बिना मूल्य जोड़ने का प्रयास किया था? load_data_steps [0] ["हडोपजर्स्टेप"] ["Args"]। संलग्न करें ( {{task_instance.xcom_pull ('create_job_flow', key = 'return_value')}} –

+0

मुझे '' 'task_instance''' कहां मिल सकता है से वस्तु? मैं अभी भी सीख रहा हूं कि इसका उपयोग कैसे करें। – davideberdin

उत्तर

3

मुझे पता चला वायु प्रवाह भंडार के बारे में this पर पीआर नहीं है। मुद्दा यह है कि EmrAddStepsOperator में चरणों के लिए कोई templating नहीं है।

  • बनाया गया है कि प्लगइन के रूप में से EmrAddStepsOperator
  • जोड़ा इस ऑपरेटर को विरासत में एक कस्टम ऑपरेटर
  • नामक नव मेरी DAG में ऑपरेटर फ़ाइल

यहाँ: इस मुद्दे पर काबू पाने के लिए, मैं निम्नलिखित किया कस्टम ऑपरेटर के लिए कोड और फ़ाइल custom_emr_add_step_operator.py में प्लगइन (नीचे पेड़ देखें)

from __future__ import division, absolute_import, print_function 

from airflow.plugins_manager import AirflowPlugin 
from airflow.utils import apply_defaults 

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator 


class CustomEmrAddStepsOperator(EmrAddStepsOperator): 
    template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above 

    @apply_defaults 
    def __init__(
      self, 
      *args, **kwargs): 
     super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs) 

    def execute(self, context): 
     super(CustomEmrAddStepsOperator, self).execute(context=context) 


# Defining the plugin class 
class CustomPlugin(AirflowPlugin): 
    name = "custom_plugin" 
    operators = [CustomEmrAddStepsOperator] 

मेरी DAG फ़ाइल में मैं इस तरह से

from airflow.operators import CustomEmrAddStepsOperator 
में प्लगइन बुलाया

अपने प्रोजेक्ट और प्लग इन की संरचना इस तरह दिखता है:

├── config 
│   └── airflow.cfg 
├── dags 
│   ├── __init__.py 
│   └── my_dag.py 
├── plugins 
│   ├── __init__.py 
│   └── operators 
│    ├── __init__.py 
│    └── custom_emr_add_step_operator.py 
└── requirements.txt 

आप इस तरह के PyCharm के रूप में एक IDE का उपयोग कर रहे हैं, तो इस वसीयत शिकायत करें क्योंकि यह कहता है कि यह मॉड्यूल नहीं ढूंढ सकता है। लेकिन जब आप एयरफ्लो चलाते हैं, तो यह समस्या प्रकट नहीं होगी। यह सुनिश्चित करने के लिए भी याद रखें कि आपके airflow.cfg में आप दाएं plugins फ़ोल्डर पर इंगित करने जा रहे हैं ताकि एयरफ्लो आपके नव निर्मित प्लगइन को पढ़ने में सक्षम हो।

संबंधित मुद्दे