2016-07-25 32 views
6

एयरफ्लो में निर्भर कार्यों में पैरामीटर पास करने का तरीका क्या है? मेरे पास बहुत सारी झाड़ियों की फाइलें हैं, और मैं इस दृष्टिकोण को एयरफ्लो में माइग्रेट करने का प्रयास कर रहा हूं, लेकिन मुझे नहीं पता कि कार्यों के बीच कुछ गुणों को कैसे पारित किया जाए।निर्भर कार्य के लिए एयरफ्लो पास पैरामीटर

यह एक वास्तविक उदाहरण है:

#sqoop bash template 
sqoop_template = """ 
     sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ 
    """ 

s3_template = """ 
     s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}} 
    """ 



#Task of extraction in EMR 
t1 = BashOperator(
     task_id='extract_account', 
     bash_command=sqoop_template, 
     params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, 
     dag=dag) 
#Task to upload in s3 backup. 
t2 = BashOperator(
     task_id='s3_upload', 
     bash_command=s3_template, 
     params={}, #here i need the dir name created in t1 
     depends_on_past=True 
    ) 

t2.set_upstream(t1) 

t2 में मैं t1 में बनाया dir नाम का उपयोग करने की जरूरत है।

समाधान

#Execute a valid job sqoop 
def sqoop_import(table_name, job_name): 
    s3, hdfs = dirpath(table_name) 
    sqoop_job = job_default_config(job_name, hdfs) 
    #call(sqoop_job) 
    return {'hdfs_dir': hdfs, 's3_dir': s3} 

def s3_upload(**context): 
    hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir'] 
    s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir'] 
    s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] 
    #call(s3_cpdist_job) 
    return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import') 

def sns_notify(**context): 
    s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir'] 
    client = boto3.client('sns') 
    arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg' 
    response = client.publish(TargetArn=arn, Message=s3) 
    return response 

नहीं है यही कारण है कि निश्चित समाधान है, इसलिए सुधार स्वागत है। धन्यवाद।

+0

मेरी राय में एक समाधान, टी 1 में बनाए गए गुणों के साथ कुछ फ़ाइल बनाना और टी 2 में इसी फ़ाइल को उपभोग करना है। –

उत्तर

8

XComs - http://airflow.incubator.apache.org/concepts.html#xcoms देखें। इनका उपयोग कार्यों के बीच राज्य को संप्रेषित करने के लिए किया जाता है।

+0

मैं इस दृष्टिकोण का उपयोग कर हल करता हूं, लेकिन यहां समाधान जोड़ने के लिए पूरी तरह से भूल जाता हूं। धन्यवाद। –

संबंधित मुद्दे