2014-10-20 5 views
7

जब मेरी यूनिट परीक्षणों में से एक SQLAlchemy ऑब्जेक्ट को हटा देता है, तो ऑब्जेक्ट एक बाद_डिलीट ईवेंट ट्रिगर करता है जो ड्राइव से फ़ाइल को हटाने के लिए एक सेलेरी कार्य को ट्रिगर करता है।कनेक्शन बंद हो जाता है जब एक एसक्यूएलकेमी घटना सेलेरी कार्य

कार्य करते समय कार्य CELERY_ALWAYS_EAGER = True है।

gist to reproduce the issue easily

उदाहरण दो परीक्षणों है। एक घटना में कार्य को ट्रिगर करता है, दूसरा घटना के बाहर। घटना में केवल एक ही कनेक्शन बंद कर देता है।

git clone https://gist.github.com/5762792fc1d628843697.git 
cd 5762792fc1d628843697 
virtualenv venv 
. venv/bin/activate 
pip install -r requirements.txt 
python test.py 

ढेर:

जल्दी त्रुटि आप चला सकते हैं पुन: पेश करने के लिए

$  python test.py 
E 
====================================================================== 
ERROR: test_delete_task (__main__.CeleryTestCase) 
---------------------------------------------------------------------- 
Traceback (most recent call last): 
    File "test.py", line 73, in test_delete_task 
    db.session.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do 
    return getattr(self.registry(), name)(*args, **kwargs) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit 
    self.transaction.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit 
    self._prepare_impl() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl 
    self.session.flush() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush 
    self._flush(objects) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__ 
    compat.reraise(type_, value, traceback) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback 
    self._assert_active(prepared_ok=True, rollback_ok=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active 
    raise sa_exc.ResourceClosedError(closed_msg) 
ResourceClosedError: This transaction is closed 

---------------------------------------------------------------------- 
Ran 1 test in 0.014s 

FAILED (errors=1) 
+0

कृपया त्रुटि और स्टैक ट्रेस पोस्ट करें। – ACV

+0

मैंने अभी परीक्षण आउटपुट के साथ अपडेट किया है। मैंने स्थानीय * निक्स मशीन पर आसानी से पुन: उत्पन्न करने के लिए आदेश भी जोड़े। – deBrice

उत्तर

8

मुझे लगता है कि मैं इस समस्या पाया - यह कैसे आप अपने अजवाइन कार्य की स्थापना में है। आप अपने अजवाइन सेटअप से एप्लिकेशन संदर्भ कॉल निकालते हैं, तो सब कुछ ठीक चलता है: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

तो मुझे लगता है with app.app_context()::

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     # deleted --> with app.app_context(): 
     return TaskBase.__call__(self, *args, **kwargs) 

कभी नहीं after_delete घटनाओं के दौरान सत्र संशोधित करने के बारे SQLAlchemy डॉक्स में एक बड़ी चेतावनी है हटाए जाने के दौरान बुलाया जा रहा है, फ्लास्क-स्क्लाक्लेमी app ऑब्जेक्ट में संग्रहीत सत्र को जोड़ने और/या संशोधित करने का प्रयास कर रहा है, और इसलिए पूरी बात बमबारी है।

फ्लास्क-एसक्यूएलएल्केमी आपके लिए दृश्यों के पीछे बहुत सारे जादू करता है, लेकिन आप इसे बाईपास कर सकते हैं और सीधे स्क्लेक्लेमी का उपयोग कर सकते हैं। आप हटाना घटना के दौरान डेटाबेस से बात करने की जरूरत है, तो आप डाटाबेस के लिए एक नया सत्र बना सकते हैं:

@celery.task() 
def my_task(): 
    # obviously here I create a new object 
    session = db.create_scoped_session() 
    session.add(User(id=13, value="random string")) 
    session.commit() 
    return 

लेकिन यह लगता है कि यह की जरूरत नहीं है, तो आप सिर्फ एक छवि को हटाने के लिए कोशिश कर रहे हैं लगता है पथ। - मुझे पता है कि अगर में से किसी के लिए काम नहीं करता है

# instance will call the task 
@event.listens_for(User, "after_delete") 
def after_delete(mapper, connection, target): 
    my_task.delay(target.value) 

@celery.task() 
def my_task(image_path): 
    os.remove(image_path) 

उम्मीद है कि है कि उपयोगी है: उस मामले में, मैं सिर्फ अपने कार्य को तो यह एक रास्ता लेता बदल जाएगा। बहुत विस्तृत सेटअप के लिए धन्यवाद, यह वास्तव में डीबगिंग में मदद की।

+0

आपने इसे खींचा, ऐप संदर्भ में चलाने के लिए कार्य को फिर से परिभाषित करना मुद्दा था। मैं देखूंगा कि मौजूदा 'celery.task' के अतिरिक्त' celery.task_with_context' सजावट को परिभाषित करना संभव है या नहीं। आपको बहुत - बहुत धन्यवाद! – deBrice

+0

कोई समस्या नहीं है! खुशी हुई यह मदद की। –

+1

ओह है @ राहेलसैंडर्स, फैंसी यहां आप में चल रहा है। – ACV

0

पूछो, अजवाइन के निर्माता ने सुझाव दिया कि समाधान on github

from celery import signals 

def make_celery(app): 
    ... 

    @signals.task_prerun.connect 
    def add_task_flask_context(sender, **kwargs): 
     if not sender.request.is_eager: 
      sender.request.flask_context = app.app_context().__enter__() 

    @signals.task_postrun.connect 
    def cleanup_task_flask_context(sender, **kwargs): 
     flask_context = getattr(sender.request, 'flask_context', None) 
     if flask_context is not None: 
      flask_context.__exit__(None, None, None) 
1

जवाब deBrice ने सुझाव दिया की तरह, लेकिन दृष्टिकोण राहेल के लिए इसी तरह के प्रयोग से।

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     import flask 
     # tests will be run in unittest app context 
     if flask.current_app: 
      return TaskBase.__call__(self, *args, **kwargs) 
     else: 
      # actual workers need to enter worker app context 
      with app.app_context(): 
       return TaskBase.__call__(self, *args, **kwargs) 
संबंधित मुद्दे