एयरफ्लो में, मुझे इस मुद्दे का सामना करना पड़ रहा है कि मुझे job_flow_id
को मेरे एमआर-चरणों में से एक को पास करने की आवश्यकता है। मैं ऑपरेटर से job_flow_id
पुनर्प्राप्त करने में सक्षम हूं लेकिन जब मैं क्लस्टर को सबमिट करने के लिए कदम बनाने जा रहा हूं, तो task_instance
मान सही नहीं है।एयरफ्लो - ईएमआर ऑपरेटर में कार्य उदाहरण
def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]
dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))
try:
create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)
load_data_steps = issue_step('load', ['arg1', 'arg2'])
load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id
load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)
check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover
except AirflowException as ae:
print ae.message
समस्या यह है कि, जब मैं ईएमआर जाँच, बजाय load_data
चरण में --cluster-id j-1234
को देखने का, मैं --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}"
देखना है, जो मेरे कदम विफल कारण बनता है: मैं निम्नलिखित कोड है।
मैं अपने चरण फ़ंक्शन के अंदर वास्तविक मूल्य कैसे प्राप्त कर सकता हूं?
धन्यवाद और खुश छुट्टियों
क्या आपने उद्धरण के बिना मूल्य जोड़ने का प्रयास किया था? load_data_steps [0] ["हडोपजर्स्टेप"] ["Args"]। संलग्न करें ( {{task_instance.xcom_pull ('create_job_flow', key = 'return_value')}} –
मुझे '' 'task_instance''' कहां मिल सकता है से वस्तु? मैं अभी भी सीख रहा हूं कि इसका उपयोग कैसे करें। – davideberdin