2015-07-09 12 views

मैं एसिंकियो/एओमैकप का उपयोग कर एक असीमित उपभोक्ता लिखने की कोशिश कर रहा हूं। मेरी समस्या यह है कि कॉलबैक कोरआउट (नीचे) अवरुद्ध है। मैंने चैनल को मूल_consume() करने के लिए सेट किया है, और कॉलबैक को कॉलबैक() के रूप में असाइन किया है। कॉलबैक में "asyncio.sleep" कथन ("काम" अनुकरण करने के लिए उपज) है, जो प्रकाशक से एक पूर्णांक लेता है और संदेश मुद्रित करने से पहले उस समय के लिए सोता है।एसिंक्रोनस खरगोश एमक्यू उपभोक्ता के साथ Aioamqp

अगर मैंने दो संदेशों को प्रकाशित किया, एक "10" के समय के साथ, तुरंत "1" के समय के साथ एक के बाद, मुझे उम्मीद थी कि दूसरा संदेश पहले प्रिंट करेगा, क्योंकि इसमें कम नींद का समय है। इसके बजाय, 10 सेकंड के लिए कॉलबैक ब्लॉक, पहला संदेश प्रिंट करता है, और फिर दूसरे को प्रिंट करता है।

यह या तो मूल_consume, या कॉलबैक दिखाई देता है, कहीं अवरुद्ध कर रहा है। क्या इसका एक और तरीका संभाला जा सकता है?

def callback(body, envelope, properties): 
    yield from asyncio.sleep(int(body)) 
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) 

def receive_log(): 
     transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password") 
     print("closed connections") 

    channel = yield from protocol.channel() 
    exchange_name = 'cloudstack-events' 
    exchange_name = 'test-async-exchange' 
    queue_name = 'async-queue-%s' % random.randint(0, 10000) 
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False) 
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10) 

    binding_keys = ['mykey'] 

    for binding_key in binding_keys: 
     print("binding", binding_key) 
     yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, 
                 routing_key=binding_key), timeout=10) 

    print(' [*] Waiting for logs. To exit press CTRL+C') 
    yield from channel.basic_consume(queue_name, callback=callback) 

loop = asyncio.get_event_loop() 

आपके पास कितने उपभोक्ता हैं? – jonnybazookatone


सिर्फ एक उपभोक्ता। लेकिन मैं अलग-अलग टाइमआउट के साथ कई कार्यक्रम प्रकाशित कर रहा हूं, और ऐसा लगता है कि asyncio.sleep() पर अवरुद्ध है। मुझे लगता है कि जब मैं ऐसा करता हूं तो पूरी कोरआउट श्रृंखला रोक दी जाती है, इसलिए जब तक कि कोई चालू नहीं हो जाता तब तक मुझे अगली घटना नहीं मिलती है। मैं इसके बजाय क्या करने की कोशिश कर रहा हूं वह कॉलबैक के भीतर एक loop.create_task() शेड्यूल करता है, जो वास्तविक कार्य करने के लिए एक और कोरआउटिन कहता है (asyncio.sleep इस मामले में)। हो सकता है कि कॉलबैक तुरंत बाहर निकल जाए ताकि मैं अतिरिक्त संदेश प्राप्त कर सकूं। इसका परीक्षण करने के लिए जा रहे हैं और देखें कि यह काम करता है या नहीं। – blindsnowmobile



रुचि रखने वालों के लिए, मैंने ऐसा करने का एक तरीका निकाला। मुझे यकीन नहीं है कि यह सबसे अच्छा अभ्यास है, लेकिन यह मुझे पूरा करने की ज़रूरत है।

कॉलबैक के अंदर "काम" (इस मामले में, async.sleep) करने के बजाय, मैं लूप पर एक नया कार्य बनाता हूं, और do_work() चलाने के लिए एक अलग सह-दिनचर्या निर्धारित करता हूं। संभवतः यह काम कर रहा है, क्योंकि यह तुरंत लौटने के लिए कॉलबैक() को मुक्त कर रहा है।

मैंने विभिन्न नींद टाइमर के साथ खरगोश में कुछ सौ घटनाएं लोड कीं, और नीचे दिए गए कोड द्वारा मुद्रित किए जाने पर उन्हें अंतःस्थापित किया गया। तो ऐसा लगता है कि काम कर रहा है। उम्मीद है कि यह किसी की मदद करता है!

def do_work(envelope, body): 
    yield from asyncio.sleep(int(body)) 
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) 

def callback(body, envelope, properties): 
    loop = asyncio.get_event_loop() 
    loop.create_task(do_work(envelope, body)) 

def receive_log(): 
     transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password") 
     print("closed connections") 

    channel = yield from protocol.channel() 
    exchange_name = 'cloudstack-events' 
    exchange_name = 'test-async-exchange' 
    queue_name = 'async-queue-%s' % random.randint(0, 10000) 
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False) 
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10) 

    binding_keys = ['mykey'] 

    for binding_key in binding_keys: 
     print("binding", binding_key) 
     yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, 
                 routing_key=binding_key), timeout=10) 

    print(' [*] Waiting for logs. To exit press CTRL+C') 
    yield from channel.basic_consume(queue_name, callback=callback) 

loop = asyncio.get_event_loop() 

'aioamqp' शायद कॉलबैक (* तर्क) से 'उपज' को आंतरिक रूप से कॉल करता है, ताकि कॉलबैक हमेशा अनुक्रमिक रूप से चलें (क्योंकि यह वांछित व्यवहार हो सकता है)। जिस तरह से आप समवर्ती कॉलबैक प्राप्त कर रहे हैं (वास्तव में इसे करने के लिए प्रतीक्षा करने के बजाय आपके कॉलबैक कार्यान्वयन के अंदर काम को शेड्यूल करके) इसे करने का सही तरीका है। – dano

संबंधित मुद्दे