पर चलने से कार्य को रोकना मैं कुछ async प्रतीक्षा सामग्री के साथ थोड़ा सा संघर्ष कर रहा हूं। मैं कुछ कार्यक्रमों के बीच संदेश भेजने/प्राप्त करने के लिए RabbitMQ का उपयोग कर रहा हूं।कुछ थ्रेड
कुछ पृष्ठभूमि के रूप में, RabbitMQ क्लाइंट 3 या ऐसे धागे का उपयोग करता है जो मैं देख सकता हूं: एक कनेक्शन थ्रेड और दो दिल की धड़कन धागे। जब भी टीसीपी के माध्यम से कोई संदेश प्राप्त होता है, कनेक्शन थ्रेड इसे संभालता है और एक कॉलबैक कॉल करता है जिसे मैंने इंटरफ़ेस के माध्यम से प्रदान किया है। दस्तावेज़ीकरण का कहना है कि इस कॉल के दौरान बहुत सारे काम करने से बचाना सबसे अच्छा है क्योंकि कनेक्शन के रूप में एक ही धागे पर किया जाता है और चीजों को जारी रखने की आवश्यकता होती है। वे QueueingBasicConsumer
की आपूर्ति करते हैं जिसमें एक 'डेक्यू' विधि अवरुद्ध होती है जिसका उपयोग संदेश प्राप्त होने के लिए किया जाता है।
मैं चाहता था कि मेरे उपभोक्ता वास्तव में इस प्रतीक्षा समय के दौरान अपना धागा संदर्भ जारी कर सकें ताकि कोई और कुछ काम कर सके, इसलिए मैंने एसिंक/प्रतीक्षा कार्यों का उपयोग करने का निर्णय लिया।
मैं एक awaitable विपंक्ति विधि है:: मैं एक AwaitableBasicConsumer
वर्ग जो निम्नलिखित फैशन में TaskCompletionSource
रों का उपयोग करता लिखा
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing a TCS. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
//if there is something in the undelivered queue, the task will be immediately completed
//otherwise, we queue the task into deliveryTCS
if (!TryDeliverUndelivered(tcs))
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
कॉलबैक जो RabbitMQ ग्राहक कॉल को पूरा कार्य: यह कहा जाता है AMQP कनेक्शन धागे के संदर्भ से
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
rwLock
एक ०१२३५५१२२७४७ है। दो कतार (deliveryTCS
और undelivered
) ConcurrentQueues हैं।
समस्या:
एक बार ही सही में, विधि है कि इंतजार कर रहा है विपंक्ति विधि एक अपवाद फेंकता है। यह आमतौर पर एक मुद्दा नहीं होगा क्योंकि यह विधि async
भी है और इसलिए यह "अपवाद" समापन स्थिति में प्रवेश करती है जो कार्य दर्ज करती है। समस्या उस परिस्थिति में आती है जहां DequeueAsync
पर कॉल करने वाले कार्य को एएमक्यूपी कनेक्शन थ्रेड पर प्रतीक्षा करने के बाद फिर से शुरू किया जाता है जिसे RabbitMQ क्लाइंट बनाता है। आम तौर पर मैंने कार्यों को मुख्य धागे या चारों ओर तैरने वाले कार्यकर्ता थ्रेडों में से एक को फिर से शुरू किया है। हालांकि, जब यह एएमक्यूपी थ्रेड पर फिर से शुरू होता है और अपवाद फेंक दिया जाता है, तो सबकुछ स्टाल हो जाता है। कार्य नहीं है "अपवाद स्थिति" दर्ज करें और एएमक्यूपी कनेक्शन थ्रेड यह कह रहा है कि यह उस विधि को निष्पादित कर रहा है जिसमें अपवाद हुआ था।
मेरा मुख्य यहाँ भ्रम की स्थिति है क्यों यह काम नहीं करता:
public async Task RunAsync()
{
using (var channel = this.Connection.CreateModel())
{
...
AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
var result = consumer.DequeueAsync(this.CancellationToken);
//wait until we find something to eat
await result;
throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
...
} //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}
पढ़ना मैं क्या लिखा है, मैं देख रहा हूँ:
var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards
ConsumerTaskState state = new ConsumerTaskState()
{
Connection = connection,
CancellationToken = cancellationToken
};
//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);
यहाँ RunAsync
विधि, परीक्षण के लिए स्थापित है कि मैंने अपनी समस्या को बहुत अच्छी तरह से समझाया नहीं है। अगर स्पष्टीकरण की आवश्यकता है, तो बस पूछें।
- निकालें सभी Async/इंतजार कोड और बस सीधे धागे और ब्लॉक का उपयोग करें:
मेरे समाधान है कि मैं के साथ आए हैं इस प्रकार हैं। प्रदर्शन में कमी आएगी, लेकिन कम से कम यह कभी-कभी
- किसी भी तरह से एएमक्यूपी धागे को कार्यों को फिर से शुरू करने के लिए उपयोग करने से मुक्त नहीं करेगा।मुझे लगता है कि वे सो रहे थे या कुछ और फिर डिफ़ॉल्ट
TaskScheduler
ने उनका उपयोग करने का फैसला किया। अगर मुझे कार्य शेड्यूलर को बताने का कोई तरीका मिल सकता है कि वे धागे सीमा से बाहर हैं, तो यह बहुत अच्छा होगा।
क्या किसी के पास यह क्यों हो रहा है कि यह क्यों हो रहा है या इसे हल करने के लिए कोई सुझाव है? अभी मैं एसिंक कोड को हटा रहा हूं ताकि कार्यक्रम विश्वसनीय हो, लेकिन मैं वास्तव में यह समझना चाहता हूं कि यहां क्या हो रहा है।
* मैं चाहता था कि मेरे उपभोक्ता वास्तव में इस प्रतीक्षा समय के दौरान अपने धागे संदर्भ को जारी कर सकें *। क्या आप कह रहे हैं कि प्रतीक्षा धागे ने अन्य धागे को निष्पादित करने से अवरुद्ध कर दिया है? वह लगता है । । । अजीब। –
खरगोश एमक्यू क्लाइंट "क्यूइंग बासिक कॉन्स्यूमर" प्रदान करता है जो कॉलर को तब तक अवरुद्ध करता है जब तक कि उसे कुछ प्राप्त न हो जाए। अगर मैं कार्यों के बजाय कच्चे धागे का उपयोग कर रहा था, तो वह धागा सो जाएगा और कुछ भी नहीं कर रहा है। मैं इस अवरोध से बचना चाहता था ताकि अंतर्निहित धागा उस समय (थ्रूपुट बढ़ाएं) के दौरान एक और कार्य कर सके, इसलिए मैंने अपना इंतजार करने योग्य उपभोक्ता लिखा जो वास्तव में कॉलिंग थ्रेड को अवरुद्ध करने के बजाय कार्यों का उपयोग करता है। –
मैं उस हिस्से को समझ गया। जो मुझे समझ में नहीं आता वह समस्या है जिसे आप हल करने की कोशिश कर रहे हैं।यदि 'क्यूइंग बासिक कॉन्स्यूमर' सक्षम रूप से लिखा गया है, तो यह एक व्यस्त प्रतीक्षा है और अवरुद्ध थ्रेड प्रतीक्षा करते समय सीपीयू चक्र का उपभोग नहीं करेगा और इसलिए किए जा रहे किसी भी अन्य काम को प्रभावित नहीं करेगा। बस उन धागे को RabbitMQ को समर्पित करें और अन्य नौकरियों के लिए अन्य धागे का उपयोग करें। –