9

पर चलने से कार्य को रोकना मैं कुछ async प्रतीक्षा सामग्री के साथ थोड़ा सा संघर्ष कर रहा हूं। मैं कुछ कार्यक्रमों के बीच संदेश भेजने/प्राप्त करने के लिए RabbitMQ का उपयोग कर रहा हूं।कुछ थ्रेड

कुछ पृष्ठभूमि के रूप में, RabbitMQ क्लाइंट 3 या ऐसे धागे का उपयोग करता है जो मैं देख सकता हूं: एक कनेक्शन थ्रेड और दो दिल की धड़कन धागे। जब भी टीसीपी के माध्यम से कोई संदेश प्राप्त होता है, कनेक्शन थ्रेड इसे संभालता है और एक कॉलबैक कॉल करता है जिसे मैंने इंटरफ़ेस के माध्यम से प्रदान किया है। दस्तावेज़ीकरण का कहना है कि इस कॉल के दौरान बहुत सारे काम करने से बचाना सबसे अच्छा है क्योंकि कनेक्शन के रूप में एक ही धागे पर किया जाता है और चीजों को जारी रखने की आवश्यकता होती है। वे QueueingBasicConsumer की आपूर्ति करते हैं जिसमें एक 'डेक्यू' विधि अवरुद्ध होती है जिसका उपयोग संदेश प्राप्त होने के लिए किया जाता है।

मैं चाहता था कि मेरे उपभोक्ता वास्तव में इस प्रतीक्षा समय के दौरान अपना धागा संदर्भ जारी कर सकें ताकि कोई और कुछ काम कर सके, इसलिए मैंने एसिंक/प्रतीक्षा कार्यों का उपयोग करने का निर्णय लिया।

मैं एक awaitable विपंक्ति विधि है:: मैं एक AwaitableBasicConsumer वर्ग जो निम्नलिखित फैशन में TaskCompletionSource रों का उपयोग करता लिखा

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken) 
{ 
    //we are enqueueing a TCS. This is a "read" 
    rwLock.EnterReadLock(); 

    try 
    { 
     TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); 

     //if we are cancelled before we finish, this will cause the tcs to become cancelled 
     cancellationToken.Register(() => 
     { 
      tcs.TrySetCanceled(); 
     }); 

     //if there is something in the undelivered queue, the task will be immediately completed 
     //otherwise, we queue the task into deliveryTCS 
     if (!TryDeliverUndelivered(tcs)) 
      deliveryTCS.Enqueue(tcs); 
     } 

     return tcs.Task; 
    } 
    finally 
    { 
     rwLock.ExitReadLock(); 
    } 
} 

कॉलबैक जो RabbitMQ ग्राहक कॉल को पूरा कार्य: यह कहा जाता है AMQP कनेक्शन धागे के संदर्भ से

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body) 
{ 
    //we want nothing added while we remove. We also block until everybody is done. 
    rwLock.EnterWriteLock(); 
    try 
    { 
     RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); 

     bool sent = false; 
     TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs; 
     while (deliveryTCS.TryDequeue(out tcs)) 
     { 
      //once we manage to actually set somebody's result, we are done with handling this 
      if (tcs.TrySetResult(e)) 
      { 
       sent = true; 
       break; 
      } 
     } 

     //if nothing was sent, we queue up what we got so that somebody can get it later. 
     /** 
     * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the 
     * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are 
     * doing our thing here. 
     */ 
     if (!sent) 
     { 
      undelivered.Enqueue(e); 
     } 
    } 
    finally 
    { 
     rwLock.ExitWriteLock(); 
    } 
} 

rwLock एक ०१२३५५१२२७४७ है। दो कतार (deliveryTCS और undelivered) ConcurrentQueues हैं।

समस्या:

एक बार ही सही में, विधि है कि इंतजार कर रहा है विपंक्ति विधि एक अपवाद फेंकता है। यह आमतौर पर एक मुद्दा नहीं होगा क्योंकि यह विधि async भी है और इसलिए यह "अपवाद" समापन स्थिति में प्रवेश करती है जो कार्य दर्ज करती है। समस्या उस परिस्थिति में आती है जहां DequeueAsync पर कॉल करने वाले कार्य को एएमक्यूपी कनेक्शन थ्रेड पर प्रतीक्षा करने के बाद फिर से शुरू किया जाता है जिसे RabbitMQ क्लाइंट बनाता है। आम तौर पर मैंने कार्यों को मुख्य धागे या चारों ओर तैरने वाले कार्यकर्ता थ्रेडों में से एक को फिर से शुरू किया है। हालांकि, जब यह एएमक्यूपी थ्रेड पर फिर से शुरू होता है और अपवाद फेंक दिया जाता है, तो सबकुछ स्टाल हो जाता है। कार्य नहीं है "अपवाद स्थिति" दर्ज करें और एएमक्यूपी कनेक्शन थ्रेड यह कह रहा है कि यह उस विधि को निष्पादित कर रहा है जिसमें अपवाद हुआ था।

मेरा मुख्य यहाँ भ्रम की स्थिति है क्यों यह काम नहीं करता:

public async Task RunAsync() 
{ 
    using (var channel = this.Connection.CreateModel()) 
    { 
     ... 
     AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel); 
     var result = consumer.DequeueAsync(this.CancellationToken); 

     //wait until we find something to eat 
     await result; 

     throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls 
     ... 
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state 
} 

पढ़ना मैं क्या लिखा है, मैं देख रहा हूँ:

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards 

ConsumerTaskState state = new ConsumerTaskState() 
{ 
    Connection = connection, 
    CancellationToken = cancellationToken 
}; 

//if there is a problem, we execute our faulted method 
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called 
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted); 

यहाँ RunAsync विधि, परीक्षण के लिए स्थापित है कि मैंने अपनी समस्या को बहुत अच्छी तरह से समझाया नहीं है। अगर स्पष्टीकरण की आवश्यकता है, तो बस पूछें।

  • निकालें सभी Async/इंतजार कोड और बस सीधे धागे और ब्लॉक का उपयोग करें:

    मेरे समाधान है कि मैं के साथ आए हैं इस प्रकार हैं। प्रदर्शन में कमी आएगी, लेकिन कम से कम यह कभी-कभी

  • किसी भी तरह से एएमक्यूपी धागे को कार्यों को फिर से शुरू करने के लिए उपयोग करने से मुक्त नहीं करेगा।मुझे लगता है कि वे सो रहे थे या कुछ और फिर डिफ़ॉल्ट TaskScheduler ने उनका उपयोग करने का फैसला किया। अगर मुझे कार्य शेड्यूलर को बताने का कोई तरीका मिल सकता है कि वे धागे सीमा से बाहर हैं, तो यह बहुत अच्छा होगा।

क्या किसी के पास यह क्यों हो रहा है कि यह क्यों हो रहा है या इसे हल करने के लिए कोई सुझाव है? अभी मैं एसिंक कोड को हटा रहा हूं ताकि कार्यक्रम विश्वसनीय हो, लेकिन मैं वास्तव में यह समझना चाहता हूं कि यहां क्या हो रहा है।

+0

* मैं चाहता था कि मेरे उपभोक्ता वास्तव में इस प्रतीक्षा समय के दौरान अपने धागे संदर्भ को जारी कर सकें *। क्या आप कह रहे हैं कि प्रतीक्षा धागे ने अन्य धागे को निष्पादित करने से अवरुद्ध कर दिया है? वह लगता है । । । अजीब। –

+0

खरगोश एमक्यू क्लाइंट "क्यूइंग बासिक कॉन्स्यूमर" प्रदान करता है जो कॉलर को तब तक अवरुद्ध करता है जब तक कि उसे कुछ प्राप्त न हो जाए। अगर मैं कार्यों के बजाय कच्चे धागे का उपयोग कर रहा था, तो वह धागा सो जाएगा और कुछ भी नहीं कर रहा है। मैं इस अवरोध से बचना चाहता था ताकि अंतर्निहित धागा उस समय (थ्रूपुट बढ़ाएं) के दौरान एक और कार्य कर सके, इसलिए मैंने अपना इंतजार करने योग्य उपभोक्ता लिखा जो वास्तव में कॉलिंग थ्रेड को अवरुद्ध करने के बजाय कार्यों का उपयोग करता है। –

+1

मैं उस हिस्से को समझ गया। जो मुझे समझ में नहीं आता वह समस्या है जिसे आप हल करने की कोशिश कर रहे हैं।यदि 'क्यूइंग बासिक कॉन्स्यूमर' सक्षम रूप से लिखा गया है, तो यह एक व्यस्त प्रतीक्षा है और अवरुद्ध थ्रेड प्रतीक्षा करते समय सीपीयू चक्र का उपभोग नहीं करेगा और इसलिए किए जा रहे किसी भी अन्य काम को प्रभावित नहीं करेगा। बस उन धागे को RabbitMQ को समर्पित करें और अन्य नौकरियों के लिए अन्य धागे का उपयोग करें। –

उत्तर

5

मैं पहली बार अनुशंसा करता हूं कि आप मेरा async intro पढ़ लें, जो सटीक शर्तों में बताता है कि await एक संदर्भ कैप्चर करेगा और निष्पादन को फिर से शुरू करने के लिए इसका उपयोग करेगा। संक्षेप में, यह वर्तमान SynchronizationContext (या वर्तमान TaskScheduler पर SynchronizationContext.Currentnull) कैप्चर करेगा।

अन्य महत्वपूर्ण विवरण यह है कि async निरंतरता TaskContinuationOptions.ExecuteSynchronously के साथ निर्धारित है (जैसा कि @ एसविक ने टिप्पणी में बताया)। मेरे पास blog post about this है लेकिन AFAIK इसे आधिकारिक रूप से कहीं भी दस्तावेज नहीं किया गया है। यह विवरण एक async निर्माता/उपभोक्ता कतार लिखना मुश्किल बनाता है।

कारण await नहीं "वापस मूल संदर्भ का उपयोग करने जा" है (शायद) है, क्योंकि RabbitMQ धागे एक SynchronizationContext या TaskScheduler की जरूरत नहीं है - इस प्रकार, निरंतरता सीधे निष्पादित किया जाता है जब आप TrySetResult कहते हैं क्योंकि उन धागे देखो नियमित थ्रेड पूल धागे की तरह।

बीटीडब्ल्यू, आपके कोड के माध्यम से पढ़ने, मुझे संदेह है कि आपके पाठक/लेखक लॉक और समवर्ती कतारों का उपयोग गलत है। मैं पूरा कोड देखे बिना सुनिश्चित नहीं हो सकता, लेकिन यह मेरी छाप है।

मैं दृढ़ता से अनुशंसा करता हूं कि आप मौजूदा async कतार का उपयोग करें और उसके आस-पास एक उपभोक्ता बनाएं (दूसरे शब्दों में, किसी और को कड़ी मेहनत करने दें :)। BufferBlock<T>TPL Dataflow में टाइप async कतार के रूप में कार्य कर सकता है; यदि आपके प्लेटफॉर्म पर डेटाफ्लो उपलब्ध है तो यह मेरी पहली सिफारिश होगी। अन्यथा, मेरे पास AsyncProducerConsumerQueue type in my AsyncEx library है, या आप write your own (जैसा कि मैंने अपने ब्लॉग पर वर्णन किया है) हो सकता है।

यहाँ एक उदाहरण BufferBlock<T> का उपयोग कर रहा है:

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); 

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body) 
{ 
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); 
    _queue.Post(e); 
} 

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken) 
{ 
    return _queue.ReceiveAsync(cancellationToken); 
} 

इस उदाहरण में, मैं अपने DequeueAsync एपीआई रखने कर रहा हूँ। हालांकि, एक बार जब आप टीपीएल डेटाफ्लो का उपयोग शुरू करते हैं, तो इसे कहीं और इस्तेमाल करने पर विचार करें। जब आपको इस तरह की कतार की आवश्यकता होती है, तो आपके कोड के अन्य हिस्सों को ढूंढना आम बात है जो डेटाफ्लो दृष्टिकोण से भी लाभान्वित होंगे। उदाहरण के लिए, DequeueAsync पर कॉल करने के तरीकों का एक समूह होने के बजाय, आप अपने BufferBlock को ActionBlock से लिंक कर सकते हैं।

+0

मैंने रीडरवाइटर लॉकस्लिम का इस तरह से उपयोग किया था जिसका शायद इरादा नहीं था। मैं यह कहता था कि कई लोग डेक्यूएसिंक कर सकते थे, लेकिन एक घटना को संभालने के दौरान, किसी को भी हैंडलर को छोड़कर कतार को छूना नहीं चाहिए। जब तक मैंने 'अविश्वसनीय' कतार नहीं जोड़ा, तब तक यह जरूरी नहीं था, इसलिए मुझे डेक्यूएसिंक को कॉल करने से पहले प्राप्त संदेशों को याद नहीं किया जाएगा। क्या मैं इसे गलत इस्तेमाल कर रहा हूँ? यहां पूर्ण AwaitableBasicConsumer कोड है: https://gist.github.com/kcuzner/7242836 –

+0

ठीक है, ऐसा लगता है कि यह काम करेगा। लेकिन यह आपको एक साधारण 'लॉक' पर ज्यादा नहीं मिलता है। मुझे एक और समस्या दिखाई देती है: 'रद्दीकरण टोकन। रजिस्ट्रार' अपने प्रतिनिधि इनलाइन को निष्पादित कर सकता है, जिससे 'SetResult' विफल हो जाएगी। फिर भी एक और अस्पष्ट कोने की स्थिति है कि मैं स्टीफन टब को छोड़ दूंगा। :) –

+0

एक और अनुवर्ती प्रश्न: RabbitMQ मानक धागे में अपने धागे को केवल 'नया थ्रेड (नया थ्रेडस्टार्ट (...' के साथ बनाता है। क्या ये थ्रेड उनके पास कार्य चलाने के योग्य हैं या वे अकेले रहेंगे सो रहा है? अगर मुझे पता चलेगा कि उन थ्रेडों पर कार्य नहीं चलेंगे (जब तक कि सिंक्रनाइज़ नहीं कहा जाता है, जैसे मेरा था), तो मैं शायद 'बफरब्लॉक ' (जिसे मैं पहले से नहीं जानता था) का उपयोग कर सकता हूं और नहीं एक ही आवेदन में टीपीएल और खरगोश एमक्यू का उपयोग करने के बारे में चिंता करने की ज़रूरत है। –

संबंधित मुद्दे