मैं पब्का का उपयोग खरगोशएमक्यू से डेटा संसाधित करने के लिए कर रहा हूं। जैसा कि मुझे विभिन्न प्रकार की समस्याओं में भागना प्रतीत होता था, मैंने यह देखने के लिए एक छोटा परीक्षण आवेदन लिखने का फैसला किया कि मैं डिस्कनेक्ट कैसे संभाल सकता हूं।खरगोश एमक्यू, पिका और पुन: कनेक्शन रणनीति
मैं जो निम्नलिखित है इस परीक्षण एप्लिकेशन लिखा है:
- कनेक्ट ब्रोकर को, जब तक सफल
- पुन: प्रयास जब एक कतार बना जुड़ा हुआ है।
- इस कतार का उपभोग करें और परिणाम को एक अजगर में डालें Queue.Queue (0)
- Queue.Queue (0) से आइटम प्राप्त करें और इसे ब्रोकर कतार में वापस लाएं।
- जब मैं एक मेजबान एक और मेजबान (एक वी एम के अंदर) पर RabbitMQ से कनेक्ट होने से मेरी स्क्रिप्ट चलाने तो इस स्क्रिप्ट में कोई त्रुटि के उत्पादन के बिना यादृच्छिक क्षणों पर बाहर निकालता है:
मैं क्या देखा 2 मुद्दे थे।
- जब मैं उसी स्क्रिप्ट पर अपनी स्क्रिप्ट चलाता हूं जिस पर RabbitMQ स्थापित है, यह ठीक चलता है और चल रहा है।
नेटवर्क समस्याओं के कारण यह समझाया जा सकता है, पैकेट गिराए गए हैं हालांकि मुझे लगता है कि कनेक्शन वास्तव में मजबूत नहीं है।
स्क्रिप्ट RabbitMQ सर्वर पर स्थानीय रूप से चलाता है और मैं RabbitMQ मारने तो स्क्रिप्ट त्रुटि के साथ बाहर निकल जाता है: "त्रुटि पिका SelectConnection: सॉकेट त्रुटि 3 पर: 104"
तो ऐसा लगता है कि मैं नहीं मिल सकता है पुन: कनेक्शन रणनीति के रूप में काम करना चाहिए। क्या कोई कोड देख सकता है तो देखें कि मैं क्या गलत कर रहा हूं?
धन्यवाद,
जे
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant connect. Reason: %s' % err)
time.sleep(1)
self.daemon=True
def run(self):
while True:
self.submitData(self.from_broker.get(block=True))
pass
def on_connected(self,connection):
connection.channel(self.on_channel_open)
def on_channel_open(self,new_channel):
self.channel = new_channel
self.channel.queue_declare(queue='sandbox', durable=True)
self.channel.basic_consume(self.processData, queue='sandbox')
def processData(self, ch, method, properties, body):
self.logging.info('Received data from broker')
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.from_broker.put(body)
def submitData(self,data):
self.logging.info('Submitting data to broker.')
self.channel.basic_publish(exchange='',
routing_key='sandbox',
body=data,
properties=self.properties)
if __name__ == '__main__':
format=('%(asctime)s %(levelname)s %(name)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format=format)
broker=Broker()
broker.start()
try:
broker.connection.ioloop.start()
except Exception as err:
print err
बार कोड के माध्यम से जा लेने और इसे से संबंधित सभी मुद्दों की खोज के लिए धन्यवाद। मैं वर्तमान में http://barryp.org/software/py-amqplib/ का उपयोग कर रहा हूं जो आईएमओ एक अधिक बुनियादी/सरल पुस्तकालय है लेकिन मेरी आवश्यकताओं को पूरी तरह से उपयुक्त बनाता है। Gevent के साथ संयोजन में मेरे कुछ वाकई अच्छे परिणाम हैं। मैं इन दिनों पिका के साथ और परेशान नहीं हूं। –
आप प्रकाशित होने के बाद एके की प्रतीक्षा करने के लिए Channel.confirm_delivery() का उपयोग कर सकते हैं, कनेक्शन बंद होने के बाद, यह समय-समय पर होगा, आपको पता चलेगा कि संदेश दलाल को नहीं दिया गया है –