2012-08-27 12 views
5

के माध्यम से निकट-एकमात्र निष्पादन के लिए हजारों एक-ऑफ (गैर-पुन: प्रयोजन) कार्यों को शेड्यूल करना कुछ संदर्भ: मैं एक Django ऐप बना रहा हूं जो उपयोगकर्ता को एक क्रिया को पूर्व-सहेजने और सटीक तिथि निर्धारित करने की अनुमति देता है/भविष्य में समय वे निष्पादित करने के लिए कार्रवाई करना चाहते हैं। उदाहरण के लिए, अगले सप्ताह 5:30 बजे प्रोग्राम की दृष्टि से फेसबुक की दीवार पर अगले सप्ताह सप्ताह में शेड्यूल करने के लिए एक पोस्ट शेड्यूल करना।Django-celery

मैं एक कार्य शेड्यूलिंग सिस्टम की तलाश में हूं जो एक-ऑफ कार्य के हजारों उदाहरणों को संभाल सकता है, सभी एक साथ-साथ निष्पादित करने के लिए सेट (त्रुटि मार्जिन प्लस या एक मिनट से कम)।

मैं इसके लिए Django-celery/Rabbitmq पर विचार कर रहा हूं, लेकिन मैंने देखा कि Celery docs एक बार उपयोग के लिए किए गए कार्यों को संबोधित नहीं करते हैं। क्या Django-celery यहां सही विकल्प है (शायद क्रोंटैबसेड्यूल को उपclass करके) या मेरी ऊर्जा बेहतर कुछ अन्य दृष्टिकोण शोध करने में बिताई है? शायद Sched Module और क्रॉन के साथ कुछ मिलकर हैकिंग।

+0

मुझे नहीं पता कि आपको मेरे नवीनतम संपादन की अधिसूचना दी जाएगी, इसलिए मैं यहां टिप्पणी करना चाहता हूं और सुनिश्चित हूं। – dokkaebi

उत्तर

7

संपादित करें 2:

किसी कारण से, मेरे सिर मूल रूप से कार्य आवर्ती के दायरे में फंस गया था। यहां एक आसान समाधान है।

आपको वास्तव में प्रत्येक उपयोगकर्ता कार्रवाई के लिए एक कार्य को परिभाषित करना है। आप भंडारण कार्यों को अपने डेटाबेस में निष्पादित करने के लिए छोड़ सकते हैं - यही कारण है कि अजवाइन यहाँ है!

फिर से अपने फेसबुक पोस्ट उदाहरण का पुन: उपयोग करना, और फिर यह मानते हुए कि आपके पास post_to_facebook कोई फ़ंक्शन है, जो उपयोगकर्ता और कुछ टेक्स्ट लेता है, कुछ जादू करता है, और उस उपयोगकर्ता के फेसबुक पर टेक्स्ट पोस्ट करता है, तो आप इसे केवल एक होने के लिए परिभाषित कर सकते हैं इस तरह कार्य:

# Task to send one update. 
@celery.task(ignore_result=True) 
def post_to_facebook(user, text): 
    # perform magic 
    return whatever_you_want 

जब कोई उपयोगकर्ता इस तरह के एक पद enqueue के लिए तैयार है, तो आप सिर्फ जब कार्य चलाने के लिए अजवाइन बता:

post_to_facebook.apply_async(
    (user, text), # args 
    eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440) # pass execution options as kwargs 
) 

यह सब यहाँ विस्तृत, की एक पूरी गुच्छा के बीच में है उपलब्ध कॉल विकल्प: http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown

यदि आपको कॉल के परिणाम की आवश्यकता है, तो आप कार्य परिभाषा में ignore_result param को छोड़ सकते हैं और एक AsyncResult ऑब्जेक्ट वापस प्राप्त कर सकते हैं, और फिर कॉल के परिणामों के लिए इसे जांच सकते हैं। यहां अधिक: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

नीचे दिए गए कुछ उत्तर अभी भी प्रासंगिक हैं। आप अभी भी प्रत्येक उपयोगकर्ता कार्रवाई के लिए एक कार्य चाहते हैं, फिर भी आप कार्य डिज़ाइन इत्यादि के बारे में सोचना चाहते हैं, लेकिन यह आपके द्वारा पूछे जाने वाले कार्यों को करने के लिए एक बहुत ही आसान सड़क है।

मूल आवर्ती कार्यों का उपयोग कर जवाब इस प्रकार है:

Dannyroa सही विचार है। मैं उस पर थोड़ा सा निर्माण करूंगा।

संपादित करें/TLDR: जवाब हाँ, अजवाइन अपनी आवश्यकताओं के अनुकूल है है। आपको बस अपनी कार्य परिभाषा पर पुनर्विचार करने की आवश्यकता हो सकती है।

मुझे लगता है कि आप अपने उपयोगकर्ताओं को अपने कार्यों को परिभाषित करने के लिए मनमाने ढंग से पायथन कोड लिखने की अनुमति नहीं दे रहे हैं। उस से कम, आपको कुछ क्रियाओं को पूर्व निर्धारित करना होगा जो उपयोगकर्ता शेड्यूल कर सकते हैं, और फिर उन्हें उन कार्रवाइयों को शेड्यूल करने की अनुमति दें। फिर, आप केवल प्रत्येक उपयोगकर्ता कार्रवाई के लिए एक निर्धारित कार्य चला सकते हैं, प्रविष्टियों की जांच कर सकते हैं और प्रत्येक प्रविष्टि के लिए कार्रवाई कर सकते हैं।

एक उपयोगकर्ता कार्रवाई:

अपने फेसबुक उदाहरण का उपयोग करके आप किसी तालिका में उपयोगकर्ताओं को 'अपडेट स्टोर होगा:

class ScheduledPost(Model): 
    user = ForeignKey('auth.User') 
    text = TextField() 
    time = DateTimeField() 
    sent = BooleanField(default=False) 

तो फिर तुम एक काम, हर मिनट चलने वाले में एंट्री के लिए जाँच करेगा तालिका अंतिम मिनट में पोस्ट की गई है (आपके द्वारा उल्लिखित त्रुटि मार्जिन के आधार पर)। यदि यह बहुत महत्वपूर्ण है कि आपने अपनी एक मिनट की विंडो को दबाया है, तो आप कार्य को हर 30 सेकंड में अधिक बार शेड्यूल कर सकते हैं। हो सकता है काम इस तरह दिखेगा (MyApp/tasks.py में):

@celery.task 
def post_scheduled_updates(): 
    from celery import current_task 
    scheduled_posts = ScheduledPost.objects.filter(
     sent=False, 
     time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this 
     time__lte=timezone.now() 
    ) 
    for post in scheduled_posts: 
     if post_to_facebook(post.text): 
      post.sent = True 
      post.save() 

config इस प्रकार दिखाई देंगे:

CELERYBEAT_SCHEDULE = { 
    'fb-every-30-seconds': { 
     'task': 'tasks.post_scheduled_updates', 
     'schedule': timedelta(seconds=30), 
    }, 
} 

अतिरिक्त उपयोगकर्ता क्रियाओं:

प्रत्येक उपयोगकर्ता कार्रवाई के लिए फेसबुक पर पोस्ट करने के अलावा, आप एक नई टेबल और एक नया कार्य परिभाषित कर सकते हैं:

class EmailToMom(Model): 
    user = ForeignKey('auth.User') 
    text = TextField() 
    subject = CharField(max_length=255) 
    sent = BooleanField(default=False) 
    time = DateTimeField() 

@celery.task 
def send_emails_to_mom(): 
    scheduled_emails = EmailToMom.objects.filter(
     sent=False, 
     time__lt=timezone.now() 
    ) 
    for email in scheduled_emails: 
     sent = send_mail(
      email.subject, 
      email.text, 
      email.user.email, 
      [email.user.mom.email], 
     ) 
     if sent: 
      email.sent = True 
      email.save() 

    CELERYBEAT_SCHEDULE = { 
     'fb-every-30-seconds': { 
      'task': 'tasks.post_scheduled_updates', 
      'schedule': timedelta(seconds=30), 
     }, 
     'mom-every-30-seconds': { 
      'task': 'tasks.send_emails_to_mom', 
      'schedule': timedelta(seconds=30), 
     }, 
    } 

स्पीड और अनुकूलन:

बजाय अद्यतन से अधिक पुनरावृत्ति पोस्ट करने के लिए और एक post_scheduled_updates कॉल के दौरान उन्हें क्रमानुसार भेजने का अधिक प्रवाह प्राप्त करने के लिए, आप उप-कार्य का एक समूह तक अंडे और उन्हें समानांतर में क्या (पर्याप्त दिया जा सकता था workers)। फिर post_scheduled_updates पर कॉल बहुत तेज़ी से चलता है और कार्यों के पूरे समूह को शेड्यूल करता है - प्रत्येक एफबी अपडेट के लिए - एएसपी चलाने के लिए। यही कारण है कि कुछ इस तरह दिखेगा:

# Task to send one update. This will be called by post_scheduled_updates. 
@celery.task 
def post_one_update(update_id): 
    try: 
     update = ScheduledPost.objects.get(id=update_id) 
    except ScheduledPost.DoesNotExist: 
     raise 
    else: 
     sent = post_to_facebook(update.text) 
     if sent: 
      update.sent = True 
      update.save() 
     return sent 

@celery.task 
def post_scheduled_updates(): 
    from celery import current_task 
    scheduled_posts = ScheduledPost.objects.filter(
     sent=False, 
     time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this 
     time__lte=timezone.now() 
    ) 
    for post in scheduled_posts: 
     post_one_update.delay(post.id) 

कोड मैं पोस्ट किया है परीक्षण नहीं किया गया है और निश्चित रूप से अनुकूल नहीं है, लेकिन यह सही रास्ते पर आप मिलना चाहिए। आपके प्रश्न में आपने थ्रूपुट के बारे में कुछ चिंताएं निभाई हैं, इसलिए आप अनुकूलित करने के लिए स्थानों पर बारीकी से देखना चाहेंगे। एक स्पष्ट रूप से post.sent=True;post.save() पर कॉल करने के बजाय थोक अपडेट है।

और जानकारी: समय-समय पर कार्यों पर

और जानकारी: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

कार्य डिजाइन रणनीतियों पर एक अनुभाग: http://docs.celeryproject.org/en/latest/userguide/optimizing.html: http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies

यहाँ अजवाइन अनुकूलन के बारे में एक पूरे पृष्ठ है।

उप-कार्य के बारे में यह पृष्ठ भी दिलचस्प हो सकता है: http://docs.celeryproject.org/en/latest/userguide/canvas.html

वास्तव में, मैं सभी अजवाइन दस्तावेज़ों को पढ़ने की सलाह देता हूं।

+0

मैं आपको धन्यवाद नहीं दे सकता, आपका जवाब मेरे लिए अविश्वसनीय रूप से सहायक रहा है। –

+0

बहुत बढ़िया। मैं खुशी से मदद कर सकता है! – dokkaebi

+0

आपके कुछ कार्य अजवाइन कार्यों के महत्वपूर्ण बेवकूफ नियम में असफल हो जाते हैं। उदाहरण के लिए: '' ' @ celery.task डीईएफ़ send_emails_to_mom(): scheduled_emails = EmailToMom.objects.filter ( भेजा = झूठी, time__lt = timezone.now() ) scheduled_emails में ईमेल के लिए : ' ' यदि यह कार्य एक ही समय में दो बार शुरू किया गया था, तो दोनों को भेजने के लिए अनुसूचित ईमेल की एक ही सूची मिल जाएगी और फिर उसी सूची में भेजना शुरू करें। – dalore

0

मैं शेड्यूलपोस्ट नामक मॉडल बनाने के लिए क्या करूँगा।

मेरे पास एक आवधिक कार्य होगा जो हर 5 मिनट या उससे भी अधिक समय तक चलता है।

कार्य किसी भी पोस्ट के लिए अनुसूचित पोस्ट तालिका की जांच करेगा जिसे फेसबुक पर धक्का दिया जाना चाहिए।

संबंधित मुद्दे