2016-10-03 37 views
10

पृथ्वी के नमस्ते लोग! मैं स्पार्क कार्यों को शेड्यूल और चलाने के लिए एयरफ्लो का उपयोग कर रहा हूं। इस समय तक मैंने जो कुछ पाया वह पाइथन डीएजी है जो एयरफ्लो प्रबंधित कर सकता है।
DAG उदाहरण:एयरफ्लो में स्पार्क कोड कैसे चलाएं?

spark_count_lines.py 
import logging 

from airflow import DAG 
from airflow.operators import PythonOperator 

from datetime import datetime 

args = { 
    'owner': 'airflow' 
    , 'start_date': datetime(2016, 4, 17) 
    , 'provide_context': True 
} 

dag = DAG(
    'spark_count_lines' 
    , start_date = datetime(2016, 4, 17) 
    , schedule_interval = '@hourly' 
    , default_args = args 
) 

def run_spark(**kwargs): 
    import pyspark 
    sc = pyspark.SparkContext() 
    df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt') 
    logging.info('Number of lines in people.txt = {0}'.format(df.count())) 
    sc.stop() 

t_main = PythonOperator(
    task_id = 'call_spark' 
    , dag = dag 
    , python_callable = run_spark 
) 

समस्या मैं अजगर कोड में अच्छा नहीं कर रहा हूँ और कुछ जावा में लिखा कार्य है। मेरा सवाल है कि पाइथन डीएजी में स्पार्क जावा जार कैसे चलाएं? या हो सकता है कि ऐसा कोई दूसरा तरीका है? मैंने स्पार्क सबमिट किया: http://spark.apache.org/docs/latest/submitting-applications.html
लेकिन मुझे नहीं पता कि सबकुछ एक साथ कैसे कनेक्ट किया जाए। शायद किसी ने इसे पहले इस्तेमाल किया और उदाहरण बना रहा है। अपना समय देने के लिए धन्यवाद!

उत्तर

9

आपको BashOperator का उपयोग करने में सक्षम होना चाहिए।

from airflow.operators.bash_operator import BashOperator 

import os 
import sys 

सेट आवश्यक पथ:

os.environ['SPARK_HOME'] = '/path/to/spark/root' 
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin')) 

और जोड़ने ऑपरेटर:

spark_task = BashOperator(
    task_id='spark_java', 
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}', 
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'}, 
    dag=dag 
) 

आप आसानी से इस विस्तार कर सकते हैं अपने कोड के बाकी के रूप है, आयात के लिए आवश्यक वर्ग और सिस्टम संकुल रखते हुए जिन्जा टेम्पलेट्स का उपयोग करके अतिरिक्त तर्क प्रदान करने के लिए।

आप निश्चित रूप से अपने मामले में उपयुक्त एक टेम्पलेट के साथ bash_command की जगह, उदाहरण के लिए गैर-स्पार्क परिदृश्य के लिए यह समायोजित कर सकते हैं:

bash_command = 'java -jar {{ params.jar }}' 

और params का समायोजन। संस्करण 1.8 (आज जारी) के रूप में

6

वायु प्रवाह,

है

SparkSQLHook कोड - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

SparkSubmitHook कोड - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

सूचना है कि इन दो नए स्पार्क ऑपरेटरों/हुक के रूप में "योगदान" शाखा में हैं 1.8 संस्करण इतना नहीं (अच्छी तरह से) दस्तावेज।

तो आप स्पार्क निष्पादन के लिए अपना जावा कोड सबमिट करने के लिए स्पार्कसबमिटऑपरेटर का उपयोग कर सकते हैं।

+0

स्पार्कस्क्लॉप्टर ऐसा लगता है कि मुझे केवल यही चीज चाहिए - हालांकि, मैं इसे काम नहीं कर सकता क्योंकि मुझे नहीं पता कि कनेक्शन स्ट्रिंग कैसा दिखना चाहिए - क्या कोई दस्तावेज कहीं भी है जो मुझे इससे मदद कर सकता है? –

+0

यदि आप इसे सेट नहीं करते हैं - कनेक्शन यार्न निष्पादन मोड में डिफ़ॉल्ट होगा - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L33 देखें – Tagar

संबंधित मुद्दे