2013-02-28 5 views
11

मैं यह खोजने की कोशिश कर रहा हूं कि मैं रेडिस और टॉरनाडो को असीमित रूप से कैसे उपयोग कर सकता हूं। मुझे tornado-redis मिला लेकिन मुझे कोड में yield जोड़ने की आवश्यकता है।मैं टॉरनाडो और रेडिस को असीमित रूप से कैसे उपयोग कर सकता हूं?

import redis 
import tornado.web 

class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = redis.StrictRedis(port=6279) 
     pubsub = client.pubsub() 
     pubsub.subscribe('test_channel') 

     for item in pubsub.listen(): 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 

     self.write(item['data']) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    tornado.ioloop.IOLoop.instance().start() 

मैं पहुँच / यूआरएल हो रही है और जरूरत है "नमस्ते दुनिया" मिलता है जबकि वहाँ एक अनुरोध /wait में लंबित है:

मैं निम्नलिखित कोड है। मैं इसे कैसे कर सकता हूं?

+1

Redis पब/उप, एक 'web.RequestHandler' में उपयोग नहीं किया जाना चाहिए क्योंकि यह जबकि' pubsub पर इंतजार कर ioloop को अवरुद्ध कर देगा .listen() '। एक काम कर रहे वेबसाईट उदाहरण के लिए http://tornadogists.org/532067/ पर एक नज़र डालें। –

+0

वेबस्केट एक अच्छा विकल्प है, हालांकि मेरे एप्लिकेशन को उन ब्राउज़रों में काम करने की आवश्यकता है जिनके पास websockets का समर्थन नहीं है। मैं लंबे मतदान का उपयोग कर रहा हूँ। यही कारण है कि मुझे 'async get' की आवश्यकता है। –

+0

@ हेलील्सन सैंटोस उस मामले में आपकी सबसे अच्छी शर्त सब्सक्राइब किए गए चैनल इतिहास (एक अलग थ्रेड द्वारा खिलाया गया) की स्थानीय स्थिति को बनाए रखने के लिए है, और उसके बाद तुरंत उस स्थिति को प्रतिक्रिया के लिए लिखें और 'प्राप्त करें' ऑपरेशन को पूरा करें। ग्राहक को अंतिम प्राप्त सूचकांक, या अंतिम समय, इत्यादि के कुछ रिकॉर्ड बनाए रखना चाहिए, जिससे आप विभिन्न ग्राहकों के लिए निरंतरता बनाए रखने की अनुमति दे सकते हैं। जब मुझे समय मिलता है तो मैं कुछ घंटों में एक उदाहरण के साथ एक उत्तर लिखूंगा। –

उत्तर

5

आपको मुख्य टॉरनाडो थ्रेड में रेडिस पब/उप का उपयोग नहीं करना चाहिए, क्योंकि यह आईओ लूप को अवरुद्ध करेगा। आप मुख्य धागे में वेब क्लाइंट से लंबे मतदान को संभाल सकते हैं, लेकिन आपको रेडिस को सुनने के लिए एक अलग धागा बनाना चाहिए। जब आप संदेश प्राप्त करते हैं तो मुख्य थ्रेड के साथ संवाद करने के लिए आप ioloop.add_callback() और/या threading.Queue का उपयोग कर सकते हैं।

1

ठीक है, तो यहाँ कैसे मैं समझ अनुरोध के साथ क्या करना होगा के मेरे उदाहरण है।

पहले एक सरल लड़ी pubsub श्रोता जो एक स्थानीय सूची वस्तु में नए संदेश भी संलग्न करती है:

मैं दो मुख्य घटक जोड़ा। मैंने कक्षा में सूची एक्सेसर्स भी जोड़े, ताकि आप श्रोता धागे से पढ़ सकें जैसे कि आप नियमित सूची से पढ़ रहे थे। जहां तक ​​आपके WebRequest का संबंध है, आप केवल स्थानीय सूची ऑब्जेक्ट से डेटा पढ़ रहे हैं। यह तुरंत लौटता है और वर्तमान अनुरोध को पूरा करने या भविष्य के अनुरोधों को स्वीकार करने और संसाधित होने से अवरुद्ध नहीं करता है।

class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 

दूसरा एप्लिकेशनमिक्स क्लास है। कार्यक्षमता और विशेषताओं को जोड़ने के लिए यह एक द्वितीयक ऑब्जेक्ट है जिसमें आपका वेब अनुरोध वर्ग है। इस मामले में यह जांचता है कि अनुरोधित चैनल के लिए कोई चैनल श्रोता पहले से मौजूद है या नहीं, अगर कोई नहीं मिला, तो बनाता है, और श्रोता हैंडल को WebRequest पर लौटाता है।

# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

WebRequest वर्ग अब श्रोता व्यवहार करता है जैसे कि यह एक स्थिर सूची थे (ध्यान में रखते हुए कि आप self.write एक स्ट्रिंग देने की आवश्यकता)

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 

अंत में, के बाद आवेदन बनाई गई है, मैं जोड़ा एक विशेषता

# add a dictionary containing channels to your application 
application.channels = {} 

साथ ही चल धागे के कुछ सफाई के रूप में एक खाली शब्दकोश, आप एक बार अनुप्रयोग से बाहर

# clean up the subscribed channels 
for channel in application.channels: 
    application.channels[channel].stop() 
    application.channels[channel].join() 

पूरा कोड:

import threading 
import redis 
import tornado.web 



class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 


# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/channel/(?P<channel>\S+)", ReadChannel), 
]) 


# add a dictionary containing channels to your application 
application.channels = {} 


if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    try: 
     tornado.ioloop.IOLoop.instance().start() 
    except KeyboardInterrupt: 
     pass 

    # clean up the subscribed channels 
    for channel in application.channels: 
     application.channels[channel].stop() 
     application.channels[channel].join() 
+0

आप आसानी से सूची को किसी कतार या किसी अन्य ऑब्जेक्ट से प्रतिस्थापित कर सकते हैं जो गैर-अवरुद्ध पहुंच का समर्थन करता है और पिछले अनुरोध के बाद प्राप्त संदेशों को वापस लौटाता है। हालांकि, आपको प्रत्येक क्लाइंट के लिए कतार बनाए रखना होगा, और गैर-अवरोधन का उपयोग करना सुनिश्चित करें और 'खाली 'अपवादों को सही तरीके से संभाल लें। –

2

अजगर> = 3 के लिए।3, मैं आपको aioredis का उपयोग करने की सलाह दूंगा। मैं नीचे दिए गए कोड का परीक्षण नहीं किया था, लेकिन यह ऐसा ही कुछ किया जाना चाहिए:

import redis 
import tornado.web 
from tornado.web import RequestHandler 

import aioredis 
import asyncio 
from aioredis.pubsub import Receiver 


class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop) 

     ch = redis.channels['test_channel'] 
     result = None 
     while await ch.wait_message(): 
      item = await ch.get() 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 
       result = item['data'] 

     self.write(result) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    print 'running' 
    tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop') 
    server = tornado.httpserver.HTTPServer(application) 
    server.bind(8888) 
    # zero means creating as many processes as there are cores. 
    server.start(0) 
    tornado.ioloop.IOLoop.instance().start() 
संबंधित मुद्दे

 संबंधित मुद्दे