2013-03-05 10 views
6

मुझे ftp के माध्यम से एक फ़ाइल डाउनलोड करने, इसे बदलने और इसे वापस अपलोड करने की आवश्यकता है। मैं अजवाइन उपयोग कर रहा हूँ यह करने के लिए, लेकिन जब श्रृंखलन का उपयोग करने की कोशिश कर मैं समस्याओं में चला रहा हूँ, मैं जहां हो रही है:सेलेरी चेनिंग कार्यों क्रमशः

TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

इसके अलावा, मैं चेन उपयोग कर सकते हैं और आश्वस्त रहें कि चरणों अनुक्रमिक हो जाएगा? यदि विकल्प नहीं है तो क्या विकल्प है?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async() 
print res.get() 

कार्य:

@task() 
def download_ftp_image(ftp_server, username , password , filename, directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     if not os.path.exists(directory): 
      os.makedirs(directory) 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     else: 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     ftp.quit() 
    except error_perm, resp: 
     raise download_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

@task() 
def upload_ftp_image(ftp_server, username , password , file , directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     new_file= file.replace(directory, "") 
     directory = directory.replace("tmp","") 
     try: 
      ftp.storbinary("STOR " + directory + new_file , open(file, "rb")) 
     except: 
      ftp.mkd(directory) 
      ftp.storbinary("STOR " + directory + new_file, open(file, "rb")) 
     ftp.quit() 
    except error_perm, resp: 
     raise upload_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

और यह एक अच्छा या मेरे विशेष मामले के लिए एक बुरी बात है? :

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
result.get() 
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
#result.get() 

उत्तर

13

एक श्रृंखला हमेशा एक पहले तर्क के रूप में पिछले परिणाम पारित कर दिया है। chains documentation से:

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

आपका upload_ftp_image कार्य इस अतिरिक्त तर्क को स्वीकार नहीं करता, और इस प्रकार यह विफल रहता है।

आपके पास चेनिंग के लिए एक अच्छा उपयोग केस है; दूसरा कार्य कहलाता है के बाद पहला कार्य पूरा हो गया है (अन्यथा परिणाम किसी भी तरह से पारित नहीं किया जा सका)।

सीधे शब्दों में पिछले काम से परिणाम के लिए एक तर्क जोड़ें:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory): 

आपको लगता है कि परिणाम मूल्य के कुछ उपयोग कर सकता; हो सकता है कि इसे डाउनलोड विधि डाउनलोड की गई फ़ाइल का पथ वापस कर दे ताकि अपलोड विधि को पता चल सके कि क्या अपलोड करना है?

+0

मुझे तब यह कैसे करना चाहिए? – psychok7

+0

@ psychok7: थोड़ा विस्तारित। –

+0

ऐसा लगता है कि मुझे यह काम मिल रहा है :) .. धन्यवाद – psychok7

17

एक और विकल्प यदि आप पिछले कार्य के वापसी मूल्य को तर्क के रूप में उपयोग नहीं करना चाहते हैं, तो 'अपरिवर्तनीयता' का उपयोग करना है।

download_ftp_image.s(...) and upload_ftp_image.s(...) 

के रूप में उन्हें निर्धारित करें::

download_ftp_image.si(...) and upload_ftp_image.si(...) 

और तुम अब एक श्रृंखला में तर्क के सामान्य संख्या के साथ कार्य का उपयोग कर सकते

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

इसके बजाय के रूप में अपने उप-कार्य को परिभाषित करने के

संबंधित मुद्दे