2014-10-21 3 views
7

का उपयोग कर RabbitMQ में सिंक्रोनस और अवरुद्ध खपत मैं ब्लॉकिंग के साथ एक कतार (RabbitMQ) को सिंक्रनाइज़ करना चाहता हूं।पिका

नोट: नीचे पूरा कोड चलाने के लिए तैयार है।

सिस्टम सेट अप रबिटएमक्यू का उपयोग क्यूइंग सिस्टम के रूप में कर रहा है, लेकिन हमारे मॉड्यूल में से एक में असीमित खपत की आवश्यकता नहीं है।

मैं एक BlockingConnection के शीर्ष पर basic_get का उपयोग कर की कोशिश की है, जो (रिटर्न (None, None, None) तुरंत) ब्लॉक नहीं करता:

# declare queue 
get_connection().channel().queue_declare(TEST_QUEUE) 
def blocking_get_1(): 

     channel = get_connection().channel() 

     # get from an empty queue (prints immediately) 
     print channel.basic_get(TEST_QUEUE) 

मैं भी consume generator उपयोग करने के लिए कोशिश की है, के साथ "कनेक्शन बंद" विफल रहता है उपभोग करने के लंबे समय के बाद।

def blocking_get_2(): 
     channel = get_connection().channel() 
     # put messages in TEST_QUEUE 
     for i in range(4): 
       channel.basic_publish(
         '', 
         TEST_QUEUE, 
         'body %d' % i 
       ) 
     consume_generator = channel.consume(TEST_QUEUE) 
     print next(consume_generator) 
     time.sleep(14400) 
     print next(consume_generator) 

वहाँ के रूप में मैं अजगर में एक Queue.Queue होगा पिका क्लाइंट का उपयोग RabbitMQ उपयोग करने के लिए कोई तरीका है? या कुछ भी समान है?

इस समय मेरा विकल्प व्यस्त-प्रतीक्षा (मूल_get का उपयोग करके) है - लेकिन यदि संभव हो, तो मैं व्यस्त प्रणाली का उपयोग करने के लिए मौजूदा सिस्टम का उपयोग करता हूं।

पूर्ण कोड:

#!/usr/bin/env python 
import pika 
import time 

TEST_QUEUE = 'test' 
def get_connection(): 
     # define connection 
     connection = pika.BlockingConnection(
       pika.ConnectionParameters(
         host=YOUR_IP, 
         port=YOUR_PORT, 
         credentials=pika.PlainCredentials(
           username=YOUR_USER, 
           password=YOUR_PASSWORD, 
         ) 
       ) 
     ) 
     return connection 

# declare queue 
get_connection().channel().queue_declare(TEST_QUEUE) 
def blocking_get_1(): 

     channel = get_connection().channel() 

     # get from an empty queue (prints immediately) 
     print channel.basic_get(TEST_QUEUE) 

def blocking_get_2(): 
     channel = get_connection().channel() 
     # put messages in TEST_QUEUE 
     for i in range(4): 
       channel.basic_publish(
         '', 
         TEST_QUEUE, 
         'body %d' % i 
       ) 
     consume_generator = channel.consume(TEST_QUEUE) 
     print next(consume_generator) 
     time.sleep(14400) 
     print next(consume_generator) 


print "blocking_get_1" 
blocking_get_1() 

print "blocking_get_2" 
blocking_get_2() 

get_connection().channel().queue_delete(TEST_QUEUE) 
+0

मुझे लगता है कि इसे दिल की धड़कन ('उपभोग' संभवतः उन्हें अवरुद्ध नहीं कर रहा है) भेजने के साथ भी करना है: http://stackoverflow.com/questions/14572020/handling-long-running-tasks-in- pika-rabbitmq –

+1

मैंने इस पर अपना ध्यान पोस्ट किया, लेकिन अगर मैं आपके प्रश्न को गलत समझा तो मुझे बताएं। :) – eandersson

उत्तर

10

पिका के साथ एक आम समस्या यह है कि यह वर्तमान में पृष्ठभूमि में भेजे घटनाओं से निपटने के नहीं है। इसका मूल रूप से मतलब है कि कई परिदृश्यों में आपको यह सुनिश्चित करने के लिए connection.process_data_events() पर कॉल करने की आवश्यकता होगी ताकि यह दिल की धड़कन को याद न करे।

इसका यह भी अर्थ है कि यदि आप एक विस्तृत अवधि के लिए सोते हैं, तो पिका आने वाले डेटा को संभालने नहीं देगी, और अंत में मर जाएगी क्योंकि यह दिल की धड़कन का जवाब नहीं दे रहा है। यहाँ एक विकल्प दिल की धड़कन को अक्षम करना है।

मैं आमतौर पर this उदाहरण में देखी गई नई घटनाओं के लिए पृष्ठभूमि जांच में धागा करके इसे हल करता हूं।

यदि आप पूरी तरह से अवरुद्ध करना चाहते हैं तो मैं ऐसा कुछ करूंगा (अपनी लाइब्रेरी AMQP-Storm पर आधारित)।

while True: 
    result = channel.basic.get(queue='simple_queue', no_ack=False) 
    if result: 
     print("Message:", result['body']) 
     channel.basic.ack(result['method']['delivery_tag']) 
    else: 
     print("Channel Empty.") 
     sleep(1) 

यह उदाहरण पर आधारित है here

+0

मुझे दो धागे से कनेक्शन तक पहुंचने में परेशानी हो रही है। इंटर-थ्रेड संचार ओवरहेड जोड़ता है, इसलिए मैं इसके बिना इसे करने के लिए एक रास्ता इंतजार करने जा रहा हूं। मैं इसे बाद में और बाद में अपडेट कर दूंगा। –

+2

हाँ यदि आप पिका का उपयोग कर रहे हैं तो यह मुश्किल हो सकता है। यह थ्रेडिंग के लिए डिज़ाइन नहीं किया गया है, लेकिन मैंने जो उदाहरण जोड़ा है, वह बहुत सारे संदेशों को संभाल सकता है। दूसरी ओर मेरी लाइब्रेरी amqp-तूफान इसे आसान बनाना चाहिए, क्योंकि यह धागा सुरक्षित है। – eandersson

संबंधित मुद्दे