2013-04-01 5 views
6

मैं कोर्नू को टर्नडो-सॉक्स और डीजेगो एप्लिकेशन सर्वर के बीच एमक्यू एडाप्टर के रूप में उपयोग करने का एक तरीका ढूंढ रहा हूं। मैंने कुछ ऐसा किया:गैर-अवरुद्ध तरीके से कोम्बू

class BrokerClient(ConsumerMixin): 
    clients = [] 

    def __init__(self): 
     self.connection = BrokerConnection(settings.BROKER_URL) 
     self.io_loop = ioloop.IOLoop.instance() 
     self.queue = sockjs_queue 
     self._handle_loop() 

    @staticmethod 
    def instance(): 
     if not hasattr(BrokerClient, '_instance'): 
      BrokerClient._instance = BrokerClient() 
     return BrokerClient._instance 

    def add_client(self, client): 
     self.clients.append(client) 

    def remove_client(self, client): 
     self.clients.remove(client) 

    def _handle_loop(self): 
     try: 
      if self.restart_limit.can_consume(1): 
       for _ in self.consume(limit=5): 
        pass 
     except self.connection.connection_errors: 
      print ('Connection to broker lost. ' 
      'Trying to re-establish the connection...') 
     self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop) 

    def get_consumers(self, Consumer, channel): 
     return [Consumer([self.queue, ], callbacks=[self.process_task])] 

    def process_task(self, body, message): 
     for client in self.clients: 
      if hasattr(body, 'users') and client.user.pk in body.users: 
       client.send(body) 
     message.ack() 

लेकिन टर्ननाडो _handle_loop निष्पादन (अपेक्षित के रूप में) पर अवरुद्ध हो गया।

क्या इसे रोकने के लिए कोई तरीका है?

मुझे टोरनाडो के लिए पिका लाइब्रेरी एडाप्टर के बारे में पता है, लेकिन मैं कोम्बू का उपयोग करना चाहता हूं क्योंकि यह पहले से ही प्रोजेक्ट में उपयोग किया गया है और इसमें लचीला परिवहन है।

अद्यतन:

जनरेटर कार्य करने के लिए

बदला गया _handle_loop

def drain_events(self, callback): 
    with self.Consumer() as (connection, channel, consumers): 
     with self.extra_context(connection, channel): 
      try: 
       connection.drain_events(timeout=1) 
      except: 
       pass 
    callback(None) 


@tornado.gen.engine 
def _handle_loop(self): 
    response = yield tornado.gen.Task(self.drain_events) 
    self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop) 

उत्तर

3

अंत में मैं RabbitMQ बैकएंड के लिए सही समाधान पाया:

class BrokerClient(object): 
    clients = [] 

    @staticmethod 
    def instance(): 
     if not hasattr(BrokerClient, '_instance'): 
      BrokerClient._instance = BrokerClient() 
     return BrokerClient._instance 

    def __init__(self): 
     self.connection = BrokerConnection(settings.BROKER_URL) 
     self.consumer = Consumer(self.connection.channel(), [queue, ], callbacks=[self.process_task]) 
     self.consumer.consume() 
     io_loop = tornado.ioloop.IOLoop.instance() 
     for sock, handler in self.connection.eventmap.items(): 
      def drain_nowait(fd, events): 
       handler() 
      io_loop.add_handler(sock.fileno(), drain_nowait, l.READ | l.ERROR) 

    def process_task(self, body, message): 
     #something 
     message.ack() 

    def add_client(self, client): 
     self.clients.append(client) 

    def remove_client(self, client): 
     self.clients.remove(client) 

अन्य बैकेंड के लिए, आप समाधान में तैनात उपयोग कर सकते हैं प्रश्न

नोट: libra के साथ काम नहीं करता है bbitmq

+0

नोट: इस लेखन के समय, नवीनतम py-amqp और kombu आधिकारिक रूप से एसिंक्रोनस रीड का समर्थन नहीं करते हैं। [इसे देखें] (https://github.com/celery/py-amqp/issues/25)। हालांकि, एसिंक खपत के लिए [ब्रेडक्रंब] (https://github.com/celery/kombu/blob/master/examples/experimental/async_consume.py) हैं – Realistic

1

मुझे कोम्बू/खरगोश एमक्यू और ज़ीरोएमक्यू के बीच एक गैर-अवरुद्ध तरीके से स्विच करने के लिए एक समान आवश्यकता थी। समाधान सॉकेट पुस्तकालयों को बंदर पैच करने के लिए गीवेंट का उपयोग करना था ताकि कोम्बु भी गैर-अवरुद्ध हो जाए। मेरे "मुख्य" थ्रेड में कोम्बू drain_events कॉलबैक चल रहा था, और एक अन्य गीवेंट थ्रेड में मेरे पास एक लूप था जो ज़ीरोएमक्यू सॉकेट से संदेश प्राप्त कर रहा था। अच्छा काम किए।

यह librabbitmq के साथ भी काम नहीं करता है क्योंकि यह सी में अपनी सॉकेट सामग्री करता है जो गीवेंट द्वारा प्रभावित नहीं होता है।

संबंधित मुद्दे