एयरफ्लो में निर्भर कार्यों में पैरामीटर पास करने का तरीका क्या है? मेरे पास बहुत सारी झाड़ियों की फाइलें हैं, और मैं इस दृष्टिकोण को एयरफ्लो में माइग्रेट करने का प्रयास कर रहा हूं, लेकिन मुझे नहीं पता कि कार्यों के बीच कुछ गुणों को कैसे पारित किया जाए।निर्भर कार्य के लिए एयरफ्लो पास पैरामीटर
यह एक वास्तविक उदाहरण है:
#sqoop bash template
sqoop_template = """
sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
"""
s3_template = """
s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
"""
#Task of extraction in EMR
t1 = BashOperator(
task_id='extract_account',
bash_command=sqoop_template,
params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
task_id='s3_upload',
bash_command=s3_template,
params={}, #here i need the dir name created in t1
depends_on_past=True
)
t2.set_upstream(t1)
t2 में मैं t1 में बनाया dir नाम का उपयोग करने की जरूरत है।
समाधान
#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
s3, hdfs = dirpath(table_name)
sqoop_job = job_default_config(job_name, hdfs)
#call(sqoop_job)
return {'hdfs_dir': hdfs, 's3_dir': s3}
def s3_upload(**context):
hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir']
s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir']
s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)]
#call(s3_cpdist_job)
return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import')
def sns_notify(**context):
s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir']
client = boto3.client('sns')
arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg'
response = client.publish(TargetArn=arn, Message=s3)
return response
नहीं है यही कारण है कि निश्चित समाधान है, इसलिए सुधार स्वागत है। धन्यवाद।
मेरी राय में एक समाधान, टी 1 में बनाए गए गुणों के साथ कुछ फ़ाइल बनाना और टी 2 में इसी फ़ाइल को उपभोग करना है। –