2010-03-17 17 views
6

मैंने लगातार संदेश कतार के लिए txredis (redis for non blocking twisted api) का उपयोग करने में असफल प्रयास किया है, मैं एक स्केपर प्रोजेक्ट के साथ स्थापित करने की कोशिश कर रहा हूं जिस पर मैं काम कर रहा हूं। मैंने पाया कि हालांकि क्लाइंट अवरुद्ध नहीं हो रहा था, लेकिन यह हो सकता था जितना धीमा हो गया क्योंकि रिएक्टर लूप में एक घटना को हजारों चरणों में विभाजित किया जाना चाहिए था।ट्विस्टेड: क्यों एक स्थगित थ्रेड पर स्थगित कॉलबैक पास करना क्यों थ्रेड को अचानक अवरुद्ध कर देता है?

तो इसके बजाय, मैंने रेडिस-पीई (नियमित अवरुद्ध मुड़ते एपीआई) का उपयोग करने और एक स्थगित धागे में कॉल को लपेटने की कोशिश की। यह बहुत अच्छा काम करता है, हालांकि जब मैं रेडिस को कॉल करता हूं तो मैं आंतरिक अवरोध करना चाहता हूं क्योंकि मैं चीजों को गति देने के प्रयासों में कनेक्शन पूलिंग सेट करना चाहता हूं।

#!/usr/bin/env python 
from twisted.internet import reactor,threads 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 


def aBlockingRedisCall(): 
    print 'doing lookup... this may take a while' 
    time.sleep(10) 
    return 'results from redis' 

def result(res): 
    print res 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 
    d = threads.deferToThread(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.run() 

if __name__=='__main__': 
    main() 

यहाँ और वह आस्थगित सूत्र में कोड बनाता कनेक्शन पूलिंग के लिए मेरे परिवर्तन है:

नीचे वर्णन करने के लिए मेरे उपयोग के मामले में आस्थगित थ्रेड के लिए मुड़ डॉक्स से लिया कुछ नमूना कोड मेरी व्याख्या है अवरुद्ध:

#!/usr/bin/env python 
from twisted.internet import reactor,defer 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 

def aBlockingRedisCall(x): 
    if x<5: #all connections are busy, try later 
     print '%s is less than 5, get a redis client later' % x 
     x+=1 
     d = defer.Deferred() 
     d.addCallback(aBlockingRedisCall) 
     reactor.callLater(1.0,d.callback,x) 
     return d 

    else: 
     print 'got a redis client; doing lookup.. this may take a while' 
     time.sleep(10) # this is now blocking.. any ideas? 
     d = defer.Deferred() 
     d.addCallback(gotFinalResult) 
     d.callback(x) 
     return d 

def gotFinalResult(x): 
    return 'final result is %s' % x 

def result(res): 
    print res 

def aBlockingMethod(): 
    print 'going to sleep...' 
    time.sleep(10) 
    print 'woke up' 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 


    d = defer.Deferred() 
    d.addCallback(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.callInThread(d.callback, 1) 
    reactor.run() 

if __name__=='__main__': 
    main() 

तो मेरे सवाल का, किसी को पता है है क्यों मेरे परिवर्तन का कारण बनता है आस्थगित धागा अवरुद्ध किए जाने और/या किसी को भी एक बेहतर समाधान का सुझाव कर सकते हैं?

उत्तर

12

ठीक है, के रूप में twisted docs कहते हैं:

Deferreds कोड नहीं बनाते हैं जादुई जब भी आप इस तरह के रूप में sleep कोड अवरुद्ध उपयोग कर रहे हैं, ब्लॉक नहीं

, आप इसे मानने की आवश्यकता एक नए धागे के लिए।

#!/usr/bin/env python 
from twisted.internet import reactor,defer, threads 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 

def aBlockingRedisCall(x): 
    if x<5: #all connections are busy, try later 
     print '%s is less than 5, get a redis client later' % x 
     x+=1 
     d = defer.Deferred() 
     d.addCallback(aBlockingRedisCall) 
     reactor.callLater(1.0,d.callback,x) 
     return d 

    else: 
     print 'got a redis client; doing lookup.. this may take a while' 
     def getstuff(x): 
      time.sleep(3) 
      return "stuff is %s" % x 

     # getstuff is blocking, so you need to push it to a new thread 
     d = threads.deferToThread(getstuff, x) 
     d.addCallback(gotFinalResult) 
     return d 

def gotFinalResult(x): 
    return 'final result is %s' % x 

def result(res): 
    print res 

def aBlockingMethod(): 
    print 'going to sleep...' 
    time.sleep(10) 
    print 'woke up' 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 


    d = defer.Deferred() 
    d.addCallback(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.callInThread(d.callback, 1) 
    reactor.run() 

if __name__=='__main__': 
    main() 

मामले में redis एपीआई नहीं बहुत जटिल इसे और अधिक twisted.web का उपयोग कर इसे पुनः लिखने के लिए प्राकृतिक हो सकता है, सिर्फ एक बहुत धागे में ब्लॉकिंग API बुलाने की बजाय। http://github.com/deldotdr/txRedis

+0

भयानक धन्यवाद! – surtyaar

0

एक संबंधित नोट पर, आप शायद एक Redis इसी प्रकार के मुड़ के लिए बनाई गए ग्राहक, का उपयोग करके एक बहुत लाभ सकता है रेडिस 2.x के नए प्रोटोकॉल और फीचर्स। आपको निश्चित रूप से इसे आज़माएं। इसे txredisapi कहा जाता है।

लगातार संदेश कतार के लिए, मैं RestMQ की अनुशंसा करता हूं। एक रेडिस-आधारित संदेश कतार प्रणाली चक्रवात और txredisapi के शीर्ष पर बनाया गया है।

http://github.com/gleicon/restmq

चीयर्स

+1

ओपी पहली पंक्ति में कहता है कि उसने txRedis का उपयोग करने की कोशिश की। – pr1001

1

वहाँ भी है मुड़ के लिए एक अप-टू-डेट Redis ग्राहक जो पहले से ही समर्थन करता है:

संबंधित मुद्दे