2015-07-09 12 views
7

मैं एसिंकियो/एओमैकप का उपयोग कर एक असीमित उपभोक्ता लिखने की कोशिश कर रहा हूं। मेरी समस्या यह है कि कॉलबैक कोरआउट (नीचे) अवरुद्ध है। मैंने चैनल को मूल_consume() करने के लिए सेट किया है, और कॉलबैक को कॉलबैक() के रूप में असाइन किया है। कॉलबैक में "asyncio.sleep" कथन ("काम" अनुकरण करने के लिए उपज) है, जो प्रकाशक से एक पूर्णांक लेता है और संदेश मुद्रित करने से पहले उस समय के लिए सोता है।एसिंक्रोनस खरगोश एमक्यू उपभोक्ता के साथ Aioamqp

अगर मैंने दो संदेशों को प्रकाशित किया, एक "10" के समय के साथ, तुरंत "1" के समय के साथ एक के बाद, मुझे उम्मीद थी कि दूसरा संदेश पहले प्रिंट करेगा, क्योंकि इसमें कम नींद का समय है। इसके बजाय, 10 सेकंड के लिए कॉलबैक ब्लॉक, पहला संदेश प्रिंट करता है, और फिर दूसरे को प्रिंट करता है।

यह या तो मूल_consume, या कॉलबैक दिखाई देता है, कहीं अवरुद्ध कर रहा है। क्या इसका एक और तरीका संभाला जा सकता है?

@asyncio.coroutine 
def callback(body, envelope, properties): 
    yield from asyncio.sleep(int(body)) 
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) 

@asyncio.coroutine 
def receive_log(): 
    try: 
     transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password") 
    except: 
     print("closed connections") 
     return 

    channel = yield from protocol.channel() 
    exchange_name = 'cloudstack-events' 
    exchange_name = 'test-async-exchange' 
    queue_name = 'async-queue-%s' % random.randint(0, 10000) 
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False) 
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10) 

    binding_keys = ['mykey'] 

    for binding_key in binding_keys: 
     print("binding", binding_key) 
     yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, 
                 queue_name=queue_name, 
                 routing_key=binding_key), timeout=10) 

    print(' [*] Waiting for logs. To exit press CTRL+C') 
    yield from channel.basic_consume(queue_name, callback=callback) 

loop = asyncio.get_event_loop() 
loop.create_task(receive_log()) 
loop.run_forever() 
+0

आपके पास कितने उपभोक्ता हैं? – jonnybazookatone

+0

सिर्फ एक उपभोक्ता। लेकिन मैं अलग-अलग टाइमआउट के साथ कई कार्यक्रम प्रकाशित कर रहा हूं, और ऐसा लगता है कि asyncio.sleep() पर अवरुद्ध है। मुझे लगता है कि जब मैं ऐसा करता हूं तो पूरी कोरआउट श्रृंखला रोक दी जाती है, इसलिए जब तक कि कोई चालू नहीं हो जाता तब तक मुझे अगली घटना नहीं मिलती है। मैं इसके बजाय क्या करने की कोशिश कर रहा हूं वह कॉलबैक के भीतर एक loop.create_task() शेड्यूल करता है, जो वास्तविक कार्य करने के लिए एक और कोरआउटिन कहता है (asyncio.sleep इस मामले में)। हो सकता है कि कॉलबैक तुरंत बाहर निकल जाए ताकि मैं अतिरिक्त संदेश प्राप्त कर सकूं। इसका परीक्षण करने के लिए जा रहे हैं और देखें कि यह काम करता है या नहीं। – blindsnowmobile

उत्तर

4

रुचि रखने वालों के लिए, मैंने ऐसा करने का एक तरीका निकाला। मुझे यकीन नहीं है कि यह सबसे अच्छा अभ्यास है, लेकिन यह मुझे पूरा करने की ज़रूरत है।

कॉलबैक के अंदर "काम" (इस मामले में, async.sleep) करने के बजाय, मैं लूप पर एक नया कार्य बनाता हूं, और do_work() चलाने के लिए एक अलग सह-दिनचर्या निर्धारित करता हूं। संभवतः यह काम कर रहा है, क्योंकि यह तुरंत लौटने के लिए कॉलबैक() को मुक्त कर रहा है।

मैंने विभिन्न नींद टाइमर के साथ खरगोश में कुछ सौ घटनाएं लोड कीं, और नीचे दिए गए कोड द्वारा मुद्रित किए जाने पर उन्हें अंतःस्थापित किया गया। तो ऐसा लगता है कि काम कर रहा है। उम्मीद है कि यह किसी की मदद करता है!

@asyncio.coroutine 
def do_work(envelope, body): 
    yield from asyncio.sleep(int(body)) 
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) 

@asyncio.coroutine 
def callback(body, envelope, properties): 
    loop = asyncio.get_event_loop() 
    loop.create_task(do_work(envelope, body)) 

@asyncio.coroutine 
def receive_log(): 
    try: 
     transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password") 
    except: 
     print("closed connections") 
     return 

    channel = yield from protocol.channel() 
    exchange_name = 'cloudstack-events' 
    exchange_name = 'test-async-exchange' 
    queue_name = 'async-queue-%s' % random.randint(0, 10000) 
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False) 
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10) 

    binding_keys = ['mykey'] 

    for binding_key in binding_keys: 
     print("binding", binding_key) 
     yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, 
                 queue_name=queue_name, 
                 routing_key=binding_key), timeout=10) 

    print(' [*] Waiting for logs. To exit press CTRL+C') 
    yield from channel.basic_consume(queue_name, callback=callback) 

loop = asyncio.get_event_loop() 
loop.create_task(receive_log()) 
loop.run_forever() 
+1

'aioamqp' शायद कॉलबैक (* तर्क) से 'उपज' को आंतरिक रूप से कॉल करता है, ताकि कॉलबैक हमेशा अनुक्रमिक रूप से चलें (क्योंकि यह वांछित व्यवहार हो सकता है)। जिस तरह से आप समवर्ती कॉलबैक प्राप्त कर रहे हैं (वास्तव में इसे करने के लिए प्रतीक्षा करने के बजाय आपके कॉलबैक कार्यान्वयन के अंदर काम को शेड्यूल करके) इसे करने का सही तरीका है। – dano

संबंधित मुद्दे