का उपयोग कर RabbitMQ में सिंक्रोनस और अवरुद्ध खपत मैं ब्लॉकिंग के साथ एक कतार (RabbitMQ) को सिंक्रनाइज़ करना चाहता हूं।पिका
नोट: नीचे पूरा कोड चलाने के लिए तैयार है।
सिस्टम सेट अप रबिटएमक्यू का उपयोग क्यूइंग सिस्टम के रूप में कर रहा है, लेकिन हमारे मॉड्यूल में से एक में असीमित खपत की आवश्यकता नहीं है।
मैं एक BlockingConnection के शीर्ष पर basic_get का उपयोग कर की कोशिश की है, जो (रिटर्न (None, None, None)
तुरंत) ब्लॉक नहीं करता:
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
मैं भी consume generator उपयोग करने के लिए कोशिश की है, के साथ "कनेक्शन बंद" विफल रहता है उपभोग करने के लंबे समय के बाद।
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
वहाँ के रूप में मैं अजगर में एक Queue.Queue
होगा पिका क्लाइंट का उपयोग RabbitMQ उपयोग करने के लिए कोई तरीका है? या कुछ भी समान है?
इस समय मेरा विकल्प व्यस्त-प्रतीक्षा (मूल_get का उपयोग करके) है - लेकिन यदि संभव हो, तो मैं व्यस्त प्रणाली का उपयोग करने के लिए मौजूदा सिस्टम का उपयोग करता हूं।
पूर्ण कोड:
#!/usr/bin/env python
import pika
import time
TEST_QUEUE = 'test'
def get_connection():
# define connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=YOUR_IP,
port=YOUR_PORT,
credentials=pika.PlainCredentials(
username=YOUR_USER,
password=YOUR_PASSWORD,
)
)
)
return connection
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
print "blocking_get_1"
blocking_get_1()
print "blocking_get_2"
blocking_get_2()
get_connection().channel().queue_delete(TEST_QUEUE)
मुझे लगता है कि इसे दिल की धड़कन ('उपभोग' संभवतः उन्हें अवरुद्ध नहीं कर रहा है) भेजने के साथ भी करना है: http://stackoverflow.com/questions/14572020/handling-long-running-tasks-in- pika-rabbitmq –
मैंने इस पर अपना ध्यान पोस्ट किया, लेकिन अगर मैं आपके प्रश्न को गलत समझा तो मुझे बताएं। :) – eandersson