2017-07-07 25 views
7

मैं हैडोप पर लंबे समय तक चलने वाले कार्यों को चलाने के लिए अजवाइन का उपयोग कर रहा हूं। प्रत्येक कार्य हडोप पर एक सुअर स्क्रिप्ट निष्पादित करता है जो लगभग 30 मिनट तक चलता है - 2 घंटे।विभिन्न मूल्यों के साथ विभिन्न सेलेरी श्रमिकों को शुरू करना

मेरे वर्तमान हैडोप सेटअप में 4 कतार ए, बी, सी, और डिफ़ॉल्ट है। सभी कार्यों को वर्तमान में एक एकल कार्यकर्ता द्वारा निष्पादित किया जा रहा है जो नौकरी को एक कतार में प्रस्तुत करता है।

मैं 3 और श्रमिकों को जोड़ना चाहता हूं जो अन्य कतारों में नौकरियां जमा करते हैं, प्रति कार्यकर्ता एक कतार।

समस्या यह है कि वर्तमान में कतार वर्तमान में हार्ड कोड है और मैं इस चर प्रति कार्यकर्ता को बनाना चाहता हूं।

मैंने बहुत कुछ खोजा लेकिन मैं प्रत्येक अजवाइन कार्यकर्ता को एक अलग कतार मूल्य पारित करने और मेरे कार्य में इसे एक्सेस करने का कोई तरीका नहीं ढूंढ पा रहा हूं।

मैं अपने सेलरी कार्यकर्ता को ऐसा शुरू करता हूं।

celery -A app.celery worker 

मैं कमांड लाइन अपने आप में कुछ अतिरिक्त तर्क पारित और मेरे काम में इसे का उपयोग करना चाहते हैं, लेकिन अजवाइन शिकायत है कि यह अपने कस्टम तर्क को नहीं समझता।

मैं --concurrency=3 पैरामीटर सेट करके उसी होस्ट पर सभी श्रमिकों को चलाने की योजना बना रहा हूं। क्या इस समस्या का कोई हल है?

धन्यवाद!

संपादित

मौजूदा परिदृश्य इस तरह है। हर मैं इसे केवल प्रिंट कतार सी

@celery.task() 
def print_something(): 
    print "C" 

tasks.print_something.delay() कह कर कार्य print_something पर अमल करने की कोशिश मैं श्रमिकों क्या मूल्य मैं उन्हें करने के लिए पारित करते हुए उन्हें शुरू करने के आधार पर एक चर पत्र प्रिंट की आवश्यकता है।

@celery.task() 
def print_something(): 
    print "<Variable Value Per Worker Here>" 
+0

क्या आपके लिए लागू कोड स्निपेट साझा करना संभव है? –

उत्तर

3

उम्मीद है कि यह किसी की मदद करेगा।

इस समस्या के लिए कई समस्याओं को हल करने की आवश्यकता है।

पहला चरण कस्टम पैरामीटर के लिए अजवाइन में समर्थन जोड़ने में शामिल था। यदि यह नहीं किया जाता है, तो अजवाइन शिकायत करेगा कि यह पैरामीटर को समझ में नहीं आता है।

चूंकि मैं फ्लास्क के साथ अजवाइन चला रहा हूं, इसलिए मैंने इस तरह की अजवाइन शुरू की है।

def configure_celery(): 
    app.config.update(
     CELERY_BROKER_URL='amqp://:@localhost:5672', 
     RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'    
    ) 
    celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'], 
        broker=app.config['CELERY_BROKER_URL']) 
    celery.conf.update(app.config) 
    TaskBase = celery.Task 

    class ContextTask(TaskBase): 
     abstract = True 

     def __call__(self, *args, **kwargs): 
      with app.app_context(): 
       return TaskBase.__call__(self, *args, **kwargs) 

    celery.Task = ContextTask 
    return celery 

मैं इस फ़ंक्शन को अजवाइन शुरू करने और इसे सेलरी नामक चर में संग्रहीत करने के लिए कहता हूं।

celery = configure_celery() 

कस्टम पैरामीटर जोड़ने के लिए आपको निम्न कार्य करने की आवश्यकता है।

def add_hadoop_queue_argument_to_worker(parser): 
    parser.add_argument(
     '--hadoop-queue', help='Hadoop queue to be used by the worker' 
    ) 

नीचे उपयोग की जाने वाली अजवाइन वह है जिसे हमने उपरोक्त चरणों से प्राप्त किया है।

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker) 

अगले चरण कार्यकर्ता में यह तर्क सुलभ बनाना होगा। ऐसा करने के लिए इन चरणों का पालन करें।

class HadoopCustomWorkerStep(bootsteps.StartStopStep): 

    def __init__(self, worker, **kwargs): 
     worker.app.hadoop_queue = kwargs['hadoop_queue'] 

श्रमिकों के निर्माण के लिए इस वर्ग का उपयोग करने के लिए अजवाइन को सूचित करें।

celery.steps['worker'].add(HadoopCustomWorkerStep) 

कार्य अब चरों तक पहुंचने में सक्षम होना चाहिए।

@app.task(bind=True) 
def print_hadoop_queue_from_config(self): 
    print self.app.hadoop_queue 

कमांड लाइन पर कार्यकर्ता चलाकर इसे सत्यापित करें।

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n [email protected]%h 
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n [email protected]%h 
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n [email protected]%h 
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n [email protected]%h 
3

क्या मैं आमतौर पर करते हैं, श्रमिकों शुरू करने के बाद किसी अन्य लिपि में (कार्य निष्पादित नहीं कर रहे हैं) (जैसे manage.py) मैं जोड़ने के मानकों के साथ आदेशों अलग तर्क के साथ विशिष्ट कार्य या कार्यों शुरू करने के लिए।

manager.py में

:

from tasks import some_task 

@click.command 
def run_task(params): 
    some_task.apply_async(params) 

और आवश्यकतानुसार इस कार्य शुरू कर देंगे।

संबंधित मुद्दे