2011-10-24 8 views
21

मैं कुछ संदेश प्रसारित करने redis 'pubsub उपयोग करना चाहते हैं, लेकिन listen का उपयोग कर अवरुद्ध हो, नीचे दिए गए कोड की तरह नहीं करना चाहती:गैर-अवरुद्ध रेडिस पबब संभव है?

import redis 
rc = redis.Redis() 

ps = rc.pubsub() 
ps.subscribe(['foo', 'bar']) 

rc.publish('foo', 'hello world') 

for item in ps.listen(): 
    if item['type'] == 'message': 
     print item['channel'] 
     print item['data'] 

पिछले for अनुभाग रोकेंगे। मैं सिर्फ यह जांचना चाहता हूं कि किसी दिए गए चैनल में डेटा है या नहीं, मैं इसे कैसे पूरा कर सकता हूं? क्या check विधि की तरह है?

+1

क्या कोई कारण है कि आप सुनकर अवरुद्ध नहीं होना चाहते हैं? रेडिस कनेक्शन बहुत सस्ते हैं और आम तौर पर उनमें से कई को आम तौर पर विकसित करना आम है। –

+1

रेडिस, जेडएमक्यू, टॉरनाडो का उपयोग करके पायथन में असिंक्रोनस पबसब - https://github.com/abhinavsingh/async_pubsub –

+1

.listen() के बजाय पबब ऑब्जेक्ट की .get_message() विधि का उपयोग करें (नीचे एक उदाहरण है)। [जब यह प्रश्न पोस्ट किया गया था तो उस विधि को पायथन रेडिस ड्राइवर में समर्थित नहीं किया गया हो सकता है]। –

उत्तर

7

मुझे नहीं लगता कि यह संभव होगा। किसी चैनल में कोई "वर्तमान डेटा" नहीं होता है, आप किसी चैनल की सदस्यता लेते हैं और चैनल पर अन्य क्लाइंट द्वारा धक्का दिए जा रहे संदेशों को प्राप्त करना प्रारंभ करते हैं, इसलिए यह एक अवरुद्ध API है। इसके अलावा यदि आप पब/उप के लिए Redis Commands documentation देखते हैं तो यह अधिक स्पष्ट हो जाएगा।

+0

मुझे लगता है कि यह उत्तर दूसरे के साथ संयुक्त है बहुत सुंदर है। वह इसे धागे में डाल सकता है। अगर वह एक चैनल की गतिविधि के दौरान तत्काल कार्रवाई नहीं करना चाहता था तो वह इसे शायद एक ताना में स्टोर कर सकता था और अपनी खुद की जांच विधि है जो लॉक म्यूटेक्स – jdi

1

किसी भी अवरोधन कोड तक पहुंचने के लिए आपको एक और प्रकार का प्रतिमान कोड करना होगा। सभी परिवर्तनों को सुनने और मुख्य धागे को दूसरी चीजों को करने के लिए छोड़ने के लिए एक नया धागा का उपयोग करना मुश्किल नहीं है।

इसके अलावा, आपको मुख्य थ्रेड और रेडिस सब्सक्राइबर थ्रेड के बीच डेटा को बदलने के लिए कुछ तंत्र की आवश्यकता होगी।

6

यह ब्लॉकिंग श्रोता को थ्रेड करने के लिए एक कामकाजी उदाहरण है।

import sys 
import cmd 
import redis 
import threading 


def monitor(): 
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0) 

    channel = sys.argv[1] 
    p = r.pubsub() 
    p.subscribe(channel) 

    print 'monitoring channel', channel 
    for m in p.listen(): 
     print m['data'] 


class my_cmd(cmd.Cmd): 
    """Simple command processor example.""" 

    def do_start(self, line): 
     my_thread.start() 

    def do_EOF(self, line): 
     return True 


if __name__ == '__main__': 
    if len(sys.argv) == 1: 
     print "missing argument! please provide the channel name." 
    else: 
     my_thread = threading.Thread(target=monitor) 
     my_thread.setDaemon(True) 

     my_cmd().cmdloop() 
+0

के साथ निर्देश में दिखता है क्या जीआईएल अब चित्र में नहीं आया है? हम शायद मल्टीप्रोसेसिंग (http://docs.python.org/2/library/multiprocessing.html) का उपयोग कर सकते हैं?लेकिन यह तरीका – Pramod

+0

प्रक्रिया बनाने के ऊपरी हिस्से में शामिल है, जीआईएल पार्टी को बर्बाद नहीं करेगा क्योंकि आप सीपीयू जोन में काम नहीं कर रहे हैं बल्कि नेटवर्क सुनते हैं। – Andrew

0

आप गैर-अवरुद्ध रेडिस पबब ऐप बनाने के लिए गीवेंट, गीवेंट बंदर पैचिंग का उपयोग कर सकते हैं।

42

यदि आप गैर-अवरुद्ध, एसिंक्रोनस प्रोसेसिंग के बारे में सोच रहे हैं, तो आप शायद असीमित फ्रेमवर्क/सर्वर का उपयोग कर रहे हैं (या उपयोग करना चाहिए)।

  • अगर आप Tornado उपयोग कर रहे हैं, वहाँ Tornado-Redis है। यह देशी टोरनाडो जेनरेटर कॉल का उपयोग कर रहा है। इसका Websocket demo पब/उप के संयोजन में इसका उपयोग करने के तरीके पर उदाहरण प्रदान करता है।

  • यदि आप Twisted का उपयोग कर रहे हैं, तो txRedis है। वहां आपके पास pub/sub example भी है।

  • यह भी लगता है कि आप Gevent's monkey patching (gevent.monkey.patch_all()) का उपयोग कर Redis-py कोई समस्या नहीं के साथ Gevent के साथ संयुक्त उपयोग कर सकते हैं।

+2

यह सही उत्तर है जिसे चेक-चिह्नित किया जाना चाहिए। मुझे यकीन नहीं है कि क्यों लोग पहिया को फिर से शुरू करेंगे, रेडिस के लिए पहले से ही मौजूदा एसिंक-क्लाइंट है, इस तरह के क्लाइंट के अस्तित्व में एक नए थ्रेड को वास्तव में जरूरी नहीं है। – securecurve

+0

@securecurve: निष्पक्ष होने के लिए, मैंने उस चिह्न को एक वर्ष से अधिक के बाद जोड़ा है। हालांकि, दोनों txRedis और brükva (जिसमें से टोरनाडो-रेडिस फोर्कड) दोनों 3 साल पुराने हैं, इसलिए यह वास्तव में एक बहाना नहीं है। – vartec

+0

इस उत्तर के पास इस प्रश्न के साथ कुछ लेना देना नहीं है। जैसा कि स्वीकृत उत्तर में बताया गया है, रेडिस उन ग्राहकों को संदेश भेजता है जो सुन रहे हैं। इसलिए संदेशों के लिए पूछने का कोई तरीका नहीं है। – Glaslos

1

सबसे कुशल दृष्टिकोण थ्रेड-आधारित के बजाय ग्रीनलेट-आधारित होगा। एक ग्रीनलेट आधारित समरूपता ढांचे के रूप में, पायथन दुनिया में पहले से ही गीवेंट स्थापित है। इसलिए Redis-py के साथ एक gevent एकीकरण अद्भुत होगा। यही है क्या GitHub पर इस मुद्दे में चर्चा हो रही है:

https://github.com/andymccurdy/redis-py/issues/310

0

Redis 'पब/उप एक चैनल पर सदस्यता ली (सुनने) ग्राहकों को संदेश भेजता है। यदि आप नहीं सुन रहे हैं, तो आप संदेश को याद करेंगे (इसलिए अवरुद्ध कॉल)। यदि आप इसे गैर-अवरुद्ध करना चाहते हैं, तो मैं इसके बजाय एक कतार का उपयोग करने की सलाह देता हूं (रेडिस भी उस पर बहुत अच्छा है)। यदि आपको पब/सब का उपयोग करना है, तो आप एसिंक्रोनस, अवरुद्ध श्रोता बनाने के लिए सुझाए गए भूगर्भ के रूप में उपयोग कर सकते हैं, कतार में संदेशों को पुश कर सकते हैं और गैर-अवरुद्ध तरीके से उस कतार से संदेशों को संसाधित करने के लिए एक अलग उपभोक्ता का उपयोग कर सकते हैं।

9

रेडिस्पी के नए संस्करण में एसिंक्रोनस पबब के लिए समर्थन है, अधिक जानकारी के लिए https://github.com/andymccurdy/redis-py देखें।

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 
2

यहाँ है धागे के बिना एक nonblocking समाधान: यहाँ प्रलेखन से ही एक उदाहरण है

fd = ps.connection._sock.fileno(); 
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block 
if rlist: 
    for rfd in rlist: 
     if fd == rfd: 
      message = ps.get_message() 

ps.get_message() अपने आप ही पर्याप्त है, लेकिन मैं इस विधि का उपयोग इतना है कि मैं कई पर इंतजार कर सकते हैं केवल लाल कनेक्शन के बजाय एफडीएस।

5

स्वीकृत उत्तर अप्रचलित है क्योंकि रेडिस-पी आपको गैर-अवरुद्ध get_message() का उपयोग करने की सलाह देता है। लेकिन यह थ्रेड का आसानी से उपयोग करने का एक तरीका भी प्रदान करता है।

https://pypi.python.org/pypi/redis

संदेशों को पढ़ने के लिए तीन अलग-अलग रणनीतियों रहे हैं।

दृश्यों के पीछे, get_message() कनेक्शन के सॉकेट को त्वरित रूप से मतदान करने के लिए सिस्टम के 'चयन' मॉड्यूल का उपयोग करता है। यदि डेटा पढ़ने के लिए उपलब्ध है, get_message() इसे पढ़ेगा, संदेश स्वरूपित करेगा और इसे वापस कर देगा या इसे एक संदेश हैंडलर को पास कर देगा। यदि पढ़ने के लिए कोई डेटा नहीं है, तो get_message() तुरंत कोई नहीं लौटाएगा। यह आपके आवेदन के अंदर एक मौजूदा घटना पाश में एकीकृत करने के लिए तुच्छ बनाता है।

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 

रेडिस-पीई के पुराने संस्करण केवल pubsub.listen() के साथ संदेश पढ़ते हैं। सुनो() एक जेनरेटर है जो एक संदेश उपलब्ध होने तक ब्लॉक करता है। यदि आपके आवेदन को कुछ और करने की आवश्यकता नहीं है लेकिन रेडिस से प्राप्त संदेशों पर प्राप्त करें और कार्य करें, तो सुनें() चलने का एक आसान तरीका है।

for message in p.listen(): 
    # do something with the message 

तीसरा विकल्प एक अलग थ्रेड में इवेंट लूप चलाता है। pubsub.run_in_thread() एक नया धागा बनाता है और इवेंट लूप शुरू करता है। थ्रेड ऑब्जेक्ट run_in_thread() के कॉलर पर वापस कर दिया जाता है। कॉलर इवेंट लूप और थ्रेड को बंद करने के लिए thread.stop() विधि का उपयोग कर सकते हैं। दृश्यों के पीछे, यह get_message() के आस-पास एक रैपर है जो एक अलग थ्रेड में चलता है, अनिवार्य रूप से आपके लिए एक छोटा गैर-अवरुद्ध ईवेंट लूप बना रहा है। run_in_thread() एक वैकल्पिक sleep_time तर्क लेता है। यदि निर्दिष्ट किया गया है, तो ईवेंट लूप लूप के प्रत्येक पुनरावृत्ति में मान के साथ time.sleep() को कॉल करेगा।

नोट: चूंकि हम एक अलग थ्रेड में चल रहे हैं, इसलिए संदेशों को संभालने का कोई तरीका नहीं है जो स्वचालित रूप से पंजीकृत संदेश हैंडलर से प्रबंधित नहीं होते हैं। इसलिए, redis-py आपको run_in_thread() को कॉल करने से रोकता है यदि आपने पैटर्न या चैनलों की सदस्यता ली है जिनके पास संदेश हैंडलर संलग्न नहीं हैं।

p.subscribe(**{'my-channel': my_handler}) 
thread = p.run_in_thread(sleep_time=0.001) 
# the event loop is now running in the background processing messages 
# when it's time to shut it down... 
thread.stop() 

तो आप सवाल का जवाब देने के लिए, जब आप जानना चाहते हैं कि कोई संदेश आया है तो get_message की जांच करें।

संबंधित मुद्दे