2012-02-29 17 views
12

मैं पब्का का उपयोग खरगोशएमक्यू से डेटा संसाधित करने के लिए कर रहा हूं। जैसा कि मुझे विभिन्न प्रकार की समस्याओं में भागना प्रतीत होता था, मैंने यह देखने के लिए एक छोटा परीक्षण आवेदन लिखने का फैसला किया कि मैं डिस्कनेक्ट कैसे संभाल सकता हूं।खरगोश एमक्यू, पिका और पुन: कनेक्शन रणनीति

मैं जो निम्नलिखित है इस परीक्षण एप्लिकेशन लिखा है:

  1. कनेक्ट ब्रोकर को, जब तक सफल
  2. पुन: प्रयास जब एक कतार बना जुड़ा हुआ है।
  3. इस कतार का उपभोग करें और परिणाम को एक अजगर में डालें Queue.Queue (0)
  4. Queue.Queue (0) से आइटम प्राप्त करें और इसे ब्रोकर कतार में वापस लाएं।

    1. जब मैं एक मेजबान एक और मेजबान (एक वी एम के अंदर) पर RabbitMQ से कनेक्ट होने से मेरी स्क्रिप्ट चलाने तो इस स्क्रिप्ट में कोई त्रुटि के उत्पादन के बिना यादृच्छिक क्षणों पर बाहर निकालता है:

    मैं क्या देखा 2 मुद्दे थे।

  5. जब मैं उसी स्क्रिप्ट पर अपनी स्क्रिप्ट चलाता हूं जिस पर RabbitMQ स्थापित है, यह ठीक चलता है और चल रहा है।

नेटवर्क समस्याओं के कारण यह समझाया जा सकता है, पैकेट गिराए गए हैं हालांकि मुझे लगता है कि कनेक्शन वास्तव में मजबूत नहीं है।

स्क्रिप्ट RabbitMQ सर्वर पर स्थानीय रूप से चलाता है और मैं RabbitMQ मारने तो स्क्रिप्ट त्रुटि के साथ बाहर निकल जाता है: "त्रुटि पिका SelectConnection: सॉकेट त्रुटि 3 पर: 104"

तो ऐसा लगता है कि मैं नहीं मिल सकता है पुन: कनेक्शन रणनीति के रूप में काम करना चाहिए। क्या कोई कोड देख सकता है तो देखें कि मैं क्या गलत कर रहा हूं?

धन्यवाद,

जे

#!/bin/python 
import logging 
import threading 
import Queue 
import pika 
from pika.reconnection_strategies import SimpleReconnectionStrategy 
from pika.adapters import SelectConnection 
import time 
from threading import Lock 

class Broker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.logging = logging.getLogger(__name__) 
     self.to_broker = Queue.Queue(0) 
     self.from_broker = Queue.Queue(0) 
     self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True) 
     self.srs = SimpleReconnectionStrategy() 
     self.properties = pika.BasicProperties(delivery_mode=2) 

     self.connection = None 
     while True: 
      try: 
       self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs) 
       break 
      except Exception as err: 
       self.logging.warning('Cant connect. Reason: %s' % err) 
       time.sleep(1) 

     self.daemon=True 
    def run(self): 
     while True: 
      self.submitData(self.from_broker.get(block=True)) 
     pass 
    def on_connected(self,connection): 
     connection.channel(self.on_channel_open) 
    def on_channel_open(self,new_channel): 
     self.channel = new_channel 
     self.channel.queue_declare(queue='sandbox', durable=True) 
     self.channel.basic_consume(self.processData, queue='sandbox')  
    def processData(self, ch, method, properties, body): 
     self.logging.info('Received data from broker') 
     self.channel.basic_ack(delivery_tag=method.delivery_tag) 
     self.from_broker.put(body) 
    def submitData(self,data): 
     self.logging.info('Submitting data to broker.') 
     self.channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=self.properties) 
if __name__ == '__main__': 
    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 
    broker=Broker() 
    broker.start() 
    try: 
     broker.connection.ioloop.start() 
    except Exception as err: 
     print err 

उत्तर

17

अपनी स्क्रिप्ट के साथ मुख्य समस्या यह है कि यह दोनों अपने मुख्य थ्रेड (जहां ioloop चल रहा है) से एक चैनल के साथ बातचीत कर रहा है और "ब्रोकर" थ्रेड (लूप में submitData पर कॉल करें)। यह not safe है।

इसके अलावा, SimpleReconnectionStrategy कुछ भी उपयोगी नहीं लगता है। यदि कनेक्शन बाधित हो जाता है तो यह पुन: कनेक्ट नहीं होता है। मेरा मानना ​​है कि यह पिका में एक बग है: https://github.com/pika/pika/issues/120

मैंने इसे बनाने के लिए अपने कोड को दोबारा करने का प्रयास किया क्योंकि मुझे लगता है कि आप इसे चाहते थे, लेकिन एक और समस्या में भाग गया। पिका में डिलीवरी विफलता का पता लगाने का कोई तरीका नहीं दिखता है, जिसका मतलब है कि कनेक्शन गिरने पर डेटा खो जा सकता है। यह ऐसी स्पष्ट आवश्यकता की तरह लगता है! basic_publish पता लगाने में कोई रास्ता नहीं हो सकता है? मैंने लेन-देन और add_on_return_callback (जिसमें से सभी घबराहट और अत्यधिक जटिल लगते थे) सहित सभी प्रकार की चीजों की कोशिश की, लेकिन कुछ भी नहीं आया। यदि वास्तव में कोई रास्ता नहीं है तो पिका केवल ऐसी परिस्थितियों में उपयोगी प्रतीत होती है जो खरगोश एमक्यू को भेजे गए डेटा के नुकसान को सहन कर सकती हैं, या उन कार्यक्रमों में जिन्हें केवल खरगोश एमक्यू से उपभोग करने की आवश्यकता है।

यह विश्वसनीय नहीं है, लेकिन संदर्भ के लिए, यहाँ कुछ कोड है कि अपने बहु धागा समस्या का हल है:

import logging 
import pika 
import Queue 
import sys 
import threading 
import time 
from functools import partial 
from pika.adapters import SelectConnection, BlockingConnection 
from pika.exceptions import AMQPConnectionError 
from pika.reconnection_strategies import SimpleReconnectionStrategy 

log = logging.getLogger(__name__) 

DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2) 


class Broker(object): 

    def __init__(self, parameters, on_channel_open, name='broker'): 
     self.parameters = parameters 
     self.on_channel_open = on_channel_open 
     self.name = name 

    def connect(self, forever=False): 
     name = self.name 
     while True: 
      try: 
       connection = SelectConnection(
        self.parameters, self.on_connected) 
       log.debug('%s connected', name) 
      except Exception: 
       if not forever: 
        raise 
       log.warning('%s cannot connect', name, exc_info=True) 
       time.sleep(10) 
       continue 

      try: 
       connection.ioloop.start() 
      finally: 
       try: 
        connection.close() 
        connection.ioloop.start() # allow connection to close 
       except Exception: 
        pass 

      if not forever: 
       break 

    def on_connected(self, connection): 
     connection.channel(self.on_channel_open) 


def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES): 
    def on_queue_declared(frame): 
     # PROBLEM pika does not appear to have a way to detect delivery 
     # failure, which means that data could be lost if the connection 
     # drops... 
     channel.confirm_delivery(on_delivered) 
     submit_data() 

    def on_delivered(frame): 
     if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']: 
      log.info('submission confirmed %r', frame) 
      # increasing this value seems to cause a higher failure rate 
      time.sleep(0) 
      submit_data() 
     else: 
      log.warn('submission failed: %r', frame) 
      #data_queue.put(...) 

    def submit_data(): 
     log.info('waiting on data queue') 
     data = data_queue.get() 
     log.info('got data to submit') 
     channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=properties, 
        mandatory=True) 
     log.info('submitted data to broker') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


def blocking_submitter(parameters, data_queue, 
     properties=DEFAULT_PROPERTIES): 
    while True: 
     try: 
      connection = BlockingConnection(parameters) 
      channel = connection.channel() 
      channel.queue_declare(queue='sandbox', durable=True) 
     except Exception: 
      log.error('connection failure', exc_info=True) 
      time.sleep(1) 
      continue 
     while True: 
      log.info('waiting on data queue') 
      try: 
       data = data_queue.get(timeout=1) 
      except Queue.Empty: 
       try: 
        connection.process_data_events() 
       except AMQPConnectionError: 
        break 
       continue 
      log.info('got data to submit') 
      try: 
       channel.basic_publish(exchange='', 
          routing_key='sandbox', 
          body=data, 
          properties=properties, 
          mandatory=True) 
      except Exception: 
       log.error('submission failed', exc_info=True) 
       data_queue.put(data) 
       break 
      log.info('submitted data to broker') 


def setup_receiver(channel, data_queue): 
    def process_data(channel, method, properties, body): 
     log.info('received data from broker') 
     data_queue.put(body) 
     channel.basic_ack(delivery_tag=method.delivery_tag) 

    def on_queue_declared(frame): 
     channel.basic_consume(process_data, queue='sandbox') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


if __name__ == '__main__': 
    if len(sys.argv) != 2: 
     print 'usage: %s RABBITMQ_HOST' % sys.argv[0] 
     sys.exit() 

    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 

    host = sys.argv[1] 
    log.info('connecting to host: %s', host) 
    parameters = pika.ConnectionParameters(host=host, heartbeat=True) 
    data_queue = Queue.Queue(0) 
    data_queue.put('message') # prime the pump 

    # run submitter in a thread 

    setup = partial(setup_submitter, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'submitter') 
    thread = threading.Thread(target= 
     partial(broker.connect, forever=True)) 

    # uncomment these lines to use the blocking variant of the submitter 
    #thread = threading.Thread(target= 
    # partial(blocking_submitter, parameters, data_queue)) 

    thread.daemon = True 
    thread.start() 

    # run receiver in main thread 
    setup = partial(setup_receiver, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'receiver') 
    broker.connect(forever=True) 
+0

बार कोड के माध्यम से जा लेने और इसे से संबंधित सभी मुद्दों की खोज के लिए धन्यवाद। मैं वर्तमान में http://barryp.org/software/py-amqplib/ का उपयोग कर रहा हूं जो आईएमओ एक अधिक बुनियादी/सरल पुस्तकालय है लेकिन मेरी आवश्यकताओं को पूरी तरह से उपयुक्त बनाता है। Gevent के साथ संयोजन में मेरे कुछ वाकई अच्छे परिणाम हैं। मैं इन दिनों पिका के साथ और परेशान नहीं हूं। –

+1

आप प्रकाशित होने के बाद एके की प्रतीक्षा करने के लिए Channel.confirm_delivery() का उपयोग कर सकते हैं, कनेक्शन बंद होने के बाद, यह समय-समय पर होगा, आपको पता चलेगा कि संदेश दलाल को नहीं दिया गया है –

संबंधित मुद्दे