2011-02-07 5 views
17

में यूनिट परीक्षण कार्य कतार अब बहुत लंबे समय से मैं कार्य को शेड्यूल करने के लिए ऐपइंजिन पर कार्य कतारों का उपयोग कर रहा हूं, जिस तरह से मुझे लगता है।एपइंजिन

लेकिन मैं हमेशा क्या सोच रहा हूं कि इसके लिए कोई परीक्षण कैसे लिखता है? अब तक मैंने यह सुनिश्चित करने के लिए परीक्षण किए हैं कि एपीआई पर कोई त्रुटि नहीं होती है जो कार्य को कतारबद्ध करता है और फिर कार्य को निष्पादित करने वाले एपीआई के लिए अधिक उचित परीक्षण लिखता है।

हालांकि हाल ही में मैंने इससे थोड़ा असंतुष्ट महसूस करना शुरू कर दिया है और मैं वास्तव में परीक्षण करने का एक तरीका खोज रहा हूं कि सही कार्य सही कतार में जोड़ा गया है। उम्मीद है कि यह कोड को तैनात करके और सर्वश्रेष्ठ की उम्मीद करके बस बेहतर किया जा सकता है।

मैं django-nonrel का उपयोग कर रहा हूं, अगर इसका उत्तर पर कोई असर पड़ता है।

संक्षेप में: कार्यों की पुष्टि करने के लिए यूनिट परीक्षण कैसे लिखा जा सकता है?

उत्तर

13

GAE Test Bed का उपयोग करके आप कार्य कतार को बाहर निकालने की अनुमति देंगे।

यदि आप FunctionalTestCase या TaskQueueTestCase से प्राप्त करते हैं, तो आपको get_tasks और assertTasksInQueue जैसी विधियां मिलेंगी।

आप वास्तव में भी कार्य चला सकते हैं। यह कैसे कार्य करता है इस पर निर्भर करता है कि आप कार्यों का उपयोग करते हैं या स्थगित हैं या नहीं।

from google.appengine.ext import deferred 
import base64 

# gets the most recent task -- since the task queue is reset between tests, 
# this is usually what you want 
def get_task(self): 
    for task in self.get_task_queue_stub().GetTasks('default'): 
     return task 

# decode and execute the deferred method 
def run_deferred(task): 
    deferred.run(base64.b64decode(task['body'])) 

में चल रहे कार्यों समान है, लेकिन आप कार्य लाने के बाद, आप WebTest (जो GAE टेस्ट बिस्तर के शीर्ष पर बनाया गया है) के रूप में प्रस्तुत करने के लिए उपयोग करें:

deferreds के लिए, मैं इस तरह की कुछ कोड है अपने पैरामीटर के साथ कार्य के यूआरएल के लिए POST अनुरोध।

+0

ऐसा लगता है कि यह चाल चल रहा है। धन्यवाद! –

+0

यदि आप सभी बेस टेस्ट मामलों का उपयोग कर रहे हैं तो आप 'self.get_tasks' और 'task [' decoded_body ']' शॉर्टकट के रूप में उपयोग कर सकते हैं (https://github.com/jgeewax/gaetestbed/blob/master/gaetestbed/ taskqueue.py)। इसके अतिरिक्त, यह उम्मीद है कि जल्द ही google.appengine.ext.testbed का हिस्सा बन जाएगा। 'Get_filtered_tasks' विधि है जो 'get_tasks' (https://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/api/taskqueue/taskqueue_stub' जैसी बहुत कुछ काम करती है।पीई # 2453) –

+0

मेरा उत्तर देखें: यह लाइब्रेरी अब ext.testbed (https://developers.google.com/appengine/docs/python/tools/localunittesting) के पक्ष में बहिष्कृत है –

1

गेटेस्टबेड नामक एक छोटा परीक्षण ढांचा है जो आपकी ज़रूरत के अनुरूप हो सकता है। विवरण के लिए कृपया देखें: https://github.com/jgeewax/gaetestbed

यह परीक्षण वातावरण नाक, नाक-जीए प्लगइन और वेबटेस्ट पैकेज के संबंध में काम करता है। जहां तक ​​मेरा संबंध है, जीएई अनुप्रयोगों का परीक्षण करने का सबसे अच्छा तरीका पाइथन पैकेज का मिश्रण दिया गया है।

+4

इस पैकेज वास्तव में हटा दिया गया है के रूप में मैं विलय कर दिया गया है google.appengine.ext.testbed में कार्यक्षमता (https://developers.google.com/appengine/docs/python/tools/localunittesting) –

0

SDK 1.4.3Testbed API स्थानीय एकीकरण परीक्षणों के लिए स्टब पुस्तकालयों की आसान कॉन्फ़िगरेशन प्रदान करता है।

Task Queue के लिए एक सेवा स्टब्स उपलब्ध है।

18

आप GAE टेस्टबेड (GAE टेस्टबेड अब पदावनत है और ext.testbed में ले जाया जा) के बजाय google.appengine.ext.testbed का उपयोग कर रहे हैं, तो आप क्या कर सकते हैं निम्नलिखित:

import base64 
import unittest2 

from google.appengine.ext import deferred 
from google.appengine.ext import testbed 


class TestTasks(unittest2.TestCase): 
    def setUp(self): 
    self.testbed = testbed.Testbed() 
    self.testbed.activate() 
    self.testbed.init_taskqueue_stub() 
    self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME) 

    def tearDown(self): 
    self.testbed.deactivate() 

    def test_send_contact_request(self): 
    # Make the request to your app that "defers" something: 
    response = ... 
    self.assertEqual(response.status_int, 200) 

    # Get the task out of the queue 
    tasks = self.taskqueue_stub.get_filtered_tasks() 
    self.assertEqual(1, len(tasks)) 

    # Run the task 
    task = tasks[0] 
    deferred.run(task.payload) 

    # Assert that other things happened (ie, if the deferred was sending mail...) 
    self.assertEqual(...) 
+1

यह स्वीकार्य उत्तर होना चाहिए। धन्यवाद – Houman

+0

क्या यह अपनी खुद की कतारों के साथ ऐसा करना संभव है? मैं स्थगित रूप से उपयोग करता हूं: deferred.defer (get_and_store_all_new_messages_async, user_id, query, next_page_token, _queue = "emailFetch") लेकिन यह कतार को नहीं पहचानता है, संभवतः क्योंकि यह यूनिट परीक्षण के लिए queue.yaml फ़ाइल नहीं पढ़ता है –

संबंधित मुद्दे