2012-11-07 11 views
16

मैं एक सेलेरी कार्य द्वारा लौटाई गई सूची से समूह बनाना चाहता हूं, ताकि कार्य में प्रत्येक आइटम के लिए सेट किया गया हो, समूह में एक कार्य जोड़ा जाएगा।एक सेलेरी कार्य को कैसे श्रृंखलाबद्ध करें जो समूह में एक सूची देता है?

उपयोग केस की व्याख्या करने के लिए यहां एक सरल कोड उदाहरण है। ??? पिछले कार्य से परिणाम होना चाहिए।

@celery.task 
def get_list(amount): 
    # In reality, fetch a list of items from a db 
    return [i for i in range(amount)] 

@celery.task 
def process_item(item): 
    #do stuff 
    pass 

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???)) 

मैं शायद यह सही ढंग से आ नहीं हूँ, लेकिन मैं बहुत यकीन है कि यह कार्य के भीतर से कार्य कॉल करने के लिए सुरक्षित नहीं है कर रहा हूँ:

@celery.task 
def process_list(): 
    for i in get_list.delay().get(): 
     process_item.delay(i) 

मैं सेकंड काम से परिणाम की जरूरत नहीं है ।

+0

दरअसल, किसी कार्य से कार्य को कॉल न करें। यह deadlocks का कारण बन जाएगा। कहो कि आपके पास एक कार्यकर्ता है। आप अपना काम कॉल करते हैं, जो कार्यकर्ता 1 से जुड़ा होता है, फिर दूसरा कार्य कहता है। उस कार्य को संसाधित करने के लिए कोई कार्यकर्ता नहीं है और सब कुछ लटका होगा। जब आप श्रमिक जोड़ते हैं, तो यह नास्टनेस थोड़ा बेहतर हो जाता है, लेकिन आप हमेशा एक ही कार्य (और समांतरता खोने) के साथ कई श्रमिकों को जोड़ देंगे। – mlissner

उत्तर

29

आप इस तरह के व्यवहार को मध्यवर्ती कार्य का उपयोग कर प्राप्त कर सकते हैं। यहां एक "नक्शा" बनाने की एक पद्धति है जो आपके द्वारा सुझाए गए कार्यों की तरह काम करता है।

from celery import task, subtask, group 

@task 
def get_list(amount): 
    return [i for i in range(amount)] 

@task 
def process_item(item): 
    # do stuff 
    pass 

@task 
def dmap(it, callback): 
    # Map a callback over an iterator and return as a group 
    callback = subtask(callback) 
    return group(callback.clone([arg,]) for arg in it)() 

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s())) 

मुझे इस सुझाव देने के लिए सोमाल से पूछने के लिए क्रेडिट जब मैंने उसे इसी तरह की समस्या पर मदद के लिए कहा।

+1

ध्यान दें कि क्लोन केवल एक उथली प्रतिलिपि करता है। यदि आप एक "जटिल" हस्ताक्षर (जैसे चेन, समूह या तार) को क्लोन करना चाहते हैं, तो आपको [celery issue 2251] में उल्लिखित पाइथन की गहरी प्रतिलिपि का उपयोग करना होगा (https://github.com/celery/अजवाइन/मुद्दों/2251)। या आप फोर-लूप में 'कॉलबैक = सबटास्क (कॉलबैक)' को फ़ंक्शन बनाते हैं और 'क्लोन' हटाते हैं। –

+0

मैंने उपरोक्त टिप्पणी को एक दर्जन बार पढ़ा है और मुझे यह नहीं मिला है। क्या आप एक उदाहरण प्रदान कर सकते हैं, @LuisNell? – mlissner

+0

@mlissner उपरोक्त कोड को देखते हुए, मेरा मतलब है कि निम्नलिखित है। अगर हम मानते हैं कि "कॉलबैक" केवल एक ही काम नहीं है, बल्कि एक जटिल वर्कफ़्लो (एक समूह या तार) है, तो आप बस '.clone() 'का उपयोग नहीं कर सकते हैं। समूह और तार बहुत जटिल हो सकते हैं (समूहों का समूह आदि)। उस स्थिति में आप केवल '.clone' का उपयोग नहीं कर सकते हैं, क्योंकि यह केवल आपके कॉलबैक हस्ताक्षर की उथली प्रति बनाता है। इसका मतलब है कि तर्क सही ढंग से पारित नहीं किए जाएंगे। यह सुनिश्चित करने के लिए कि सब कुछ अपेक्षित काम करता है, आपको मेरी मूल टिप्पणी में उल्लिखित 'गहरी कॉपी' का उपयोग करने की आवश्यकता है - क्या इससे यह और स्पष्ट हो जाता है? यदि नहीं, तो मैं फिर कोशिश करूंगा। –

संबंधित मुद्दे