2011-07-05 18 views
6

के साथ सेलेरी कार्यों को निष्पादित करना मैं सेलरी कार्य करना चाहता हूं जो 2 या अधिक कार्यों के परिणाम पर निर्भर करता है। मैंने Python+Celery: Chaining jobs? और http://pypi.python.org/pypi/celery-tasktree में देखा है, लेकिन वे केवल तभी अच्छे हैं जब कार्यों में केवल एक निर्भर कार्य हो।निर्भरता ग्राफ

मुझे टास्कसेट के बारे में पता है, लेकिन जब TaskSetResult.ready() सत्य हो जाता है तो तुरंत कॉलबैक निष्पादित करने का कोई तरीका प्रतीत नहीं होता है। मेरे मन में जो अभी है, वह एक आवधिक कार्य है जो प्रत्येक कुछ [मिली] सेकंड या उससे भी अधिक समय तक कार्य करता है और कॉलबैक को फायर करता है क्योंकि यह सच होता है, लेकिन यह मेरे लिए सुरुचिपूर्ण लगता है।

कोई सुझाव?

उत्तर

2

mrbox सत्य है, आप परिणाम तैयार होने तक पुनः प्रयास कर सकते हैं, लेकिन दस्तावेज़ों में इतना स्पष्ट नहीं है कि जब आप पुनः प्रयास करते हैं तो आपको सेटिड और उपटस्क तत्वों को पास करना होगा, और पुनर्प्राप्ति के लिए आपको मानचित्र का उपयोग करना होगा फ़ंक्शन, नीचे वर्णित करने के लिए एक नमूना कोड है जिसका अर्थ है।

def run(self, setid=None, subtasks=None, **kwargs): 

    if not setid or not subtasks: 
     #Is the first time that I launch this task, I'm going to launch the subtasks 
     … 
     tasks = [] 
     for slice in slices: 
      tasks.append(uploadTrackSlice.subtask((slice,folder_name))) 

     job = TaskSet(tasks=tasks) 
     task_set_result = job.apply_async() 
     setid = task_set_result.taskset_id 
     subtasks = [result.task_id for result in task_set_result.subtasks] 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 

    #Is a retry than we just have to check the results   
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks)) 
    if not tasks_result.ready(): 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 
    else:  
     if tasks_result.successful(): 
      return tasks_result.join() 
     else: 
      raise Exception("Some of the tasks was failing") 
2

आप बात max_retries = कोई नहीं के साथ docs- link

या आप उपयोग कर सकते हैं फिर से प्रयास करें विधि में किया करने के लिए sth समान कर सकते हैं IMHO - अगर .ready 'आधार' कार्यों में से एक() गलत है, आप कर सकते हैं अग्नि .retry() विधि दोनों 'आधार' कार्यों को पूरा कर लेते हैं।

7

अजवाइन के हाल के संस्करणों में (3.0+) आप इच्छित प्रभाव को प्राप्त करने के लिए एक तथाकथित तार का उपयोग कर सकते हैं:

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives से:

सरल तार

कॉर्ड प्राइमेटिव हमें कॉलबैक जोड़ने के लिए सक्षम बनाता है जब किसी समूह के कार्यों के सभी निष्पादन समाप्त हो जाते हैं, जो अक्सर एल्गोरिदम के लिए आवश्यक होता है पर शर्मनाक समानांतर नहीं हैं:

>>> from celery import chord 
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() 
>>> res.get() 
90 

अस्वीकरण: मैं अभी तक इस अपने आप को प्रयास नहीं किया है।

संबंधित मुद्दे