2015-08-31 10 views
10

का उपयोग करके निर्धारित नहीं है, मैं एक Django ऐप में कार्य शेड्यूलिंग को संभालने के लिए सेलेरी का उपयोग कर रहा हूं, मैं केवल परीक्षण के लिए Django डेटाबेस के साथ काम कर रहा हूं।कार्य निष्पादन की अनुमति दें यदि यह पहले से ही सेलेरी

मैंने केवल एक कार्य के निष्पादन को संभालने के लिए कई चीजों की कोशिश की है, यदि यह पहले से निर्धारित नहीं है या प्रगति में है जैसे article में प्रस्तावित है, लेकिन अब तक कुछ भी काम नहीं करता है।

कुछ इस तरह:

task.py

@task() 
def add(x, y): 
    return x + y 

और फिर जब आप इसे दो बार निम्नलिखित तरीके से की तरह फोन:

import myapp.tasks.add 

myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15) 
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15) 

यह चाहिएमें आधारित एक उदाहरण की अनुमति देनी चाहिए। मैं कैसे पूरा कर सकता हूं कि दूसरा कॉल कभी भी इसे निष्पादित नहीं करता है यदि कोई और दौड़ रहा है या इंतजार कर रहा है?

उत्तर

5

स्वीकृत उत्तर के साथ एक समस्या यह है कि यह धीमा है। यह जांचना कि क्या कोई कार्य पहले से चल रहा है, इसमें ब्रोकर को कॉल करना और फिर चल रहे और सक्रिय कार्यों दोनों के माध्यम से पुनरावृत्ति करना शामिल है। यदि आप तेजी से कार्य कतार करना चाहते हैं तो यह काम नहीं करेगा। इसके अलावा वर्तमान समाधान में एक छोटी दौड़ की स्थिति होती है, जिसमें 2 प्रक्रियाएं जांच कर सकती हैं कि कार्य को कतारबद्ध किया गया है (पता लगाएं कि यह नहीं है), जो तब 2 कार्यों को कतारबद्ध करेगा।

एक बेहतर समाधान होगा जिसे मैं कार्यवाही करता हूं। मूल रूप से आप प्रत्येक कार्य को कतार में एक काउंटर बढ़ाते हैं। जब कार्य शुरू होता है तो आप इसे कम करते हैं। रेडिस का प्रयोग करें और फिर यह सभी परमाणु है।

उदा।

कतार कार्य हाथ में:

conn = get_redis() 
conn.incr(key) 
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 
तो कार्य में

, आप 2 विकल्प है, तो आप 15 सेकंड के बाद पहले एक कतारबद्ध किया गया था (थ्रोटल) कार्य निष्पादित करने के लिए या 15 सेकंड के बाद यह अमल करना चाहते हैं आखिरी व्यक्ति कतारबद्ध था (बहस)। यही है, अगर हम एक ही कार्य को चलाने की कोशिश करते रहते हैं तो क्या हम टाइमर का विस्तार करते हैं, या हम पहले के लिए 15 प्रतीक्षा करते हैं और कतारबद्ध अन्य कार्यों को अनदेखा करते हैं।

दोनों का समर्थन करने के लिए आसान है, यहाँ debounce है जहाँ हम जब तक कार्य बंद हो जाता है पंक्तिबद्ध हो रही इंतजार:

conn = get_redis() 
counter = conn.decr(key) 
if counter > 0: 
    # task is queued 
    return 
# continue on to rest of task 

थ्रॉटल संस्करण:

counter = conn.getset(key, '0') 
if counter == '0': 
    # we already ran so ignore all the tasks that were queued since 
    return 
# continue on to task 

स्वीकार किए जाते हैं पर इस समाधान का एक अन्य लाभ यह है कि कुंजी पूरी तरह से आपके नियंत्रण में है। तो यदि आप एक ही कार्य को निष्पादित करना चाहते हैं लेकिन उदाहरण के लिए अलग-अलग आईडी/ऑब्जेक्ट्स के लिए केवल एक बार, आप इसे अपनी कुंजी में शामिल करते हैं।

अद्यतन

इस बारे में और भी अधिक सोच रहा था, आप कार्यों को क़तार में बिना थ्रोटल संस्करण और भी आसान कर सकते हैं।

थ्रॉटल v2

conn = get_redis() 
counter = conn.incr(key) 
if counter == 1: 
    # queue up the task only the first time 
    task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

कार्य में फिर (जब तक कार्य कतार) तुम वापस 0.

के लिए काउंटर सेट तुम भी एक काउंटर का उपयोग करने की जरूरत नहीं है, अगर आप था एक सेट आप सेट में कुंजी जोड़ सकते हैं। यदि आप 1 वापस प्राप्त करते हैं, तो कुंजी सेट में नहीं थी और आपको कार्य को कतारबद्ध करना चाहिए। यदि आप 0 वापस प्राप्त करते हैं, तो कुंजी पहले से ही सेट में है इसलिए कार्य को कतारबद्ध न करें।

+0

हां मैं सहमत हूं, भले ही आपने कहा कि सभी चल रहे कार्यों पर पुनरावृत्ति की प्रक्रिया महंगी है। अच्छा –

+0

जहां 'get_redis' से आता है? –

+0

यह मेरी अपनी विधि है जो एक रेडिस कनेक्शन देता है। – dalore

2

आपको छलांग लगाने से पहले देखो! आप कतार कार्य करने से पहले जांच कर सकते हैं कि कोई कार्य चल रहा है या इंतजार कर रहा है या नहीं।

from celery.task.control import inspect 

def is_running_waiting(task_name): 
    """ 
    Check if a task is running or waiting. 
    """ 
    scheduled_tasks = inspect().scheduled().values()[0] 
    for task in scheduled_tasks: 
     if task['request']['name'] == task_name: 
      return True 
    running_tasks = inspect().active().values()[0] 
    for task in running_tasks: 
     if task['request']['name'] == task_name: 
      return True 

अब अगर आप कतार तीन कार्यों जोड़ने के लिए, पहले एक निष्पादन के लिए पंक्ति में लग जाएँगे, अभ्यस्त शेष पंक्तिबद्ध हो।

for i in range(3): 
    if not is_running_waiting('add'): 
     add.apply_async((2,2), countdown=15) 
संबंधित मुद्दे