2016-03-02 13 views
6

में एकाधिक घटनाओं का इंतजार है मैं producer/consumer pattern का उपयोग कर डेटा लिंक परत लागू कर रहा हूं। डेटा लिंक परत के पास तार (ईथरनेट, आरएस -223 ...) पर डेटा लिंक प्रोटोकॉल को संवाद करने के लिए अपना स्वयं का धागा और राज्य मशीन है। भौतिक परत के लिए इंटरफ़ेस को System.IO.Stream के रूप में दर्शाया गया है। एक और धागा डेटा लिंक ऑब्जेक्ट से संदेश लिखता है और पढ़ता है।सी # निर्माता/उपभोक्ता

  1. एक बाइट प्राप्त होता है
  2. एक संदेश नेटवर्क धागा
  3. रखें जिंदा टाइमर से उपलब्ध है:

    डेटा लिंक वस्तु एक निष्क्रिय अवस्था है कि चार शर्तों में से एक के लिए इंतज़ार करना होगा है

  4. समाप्त हो गया है सभी संचार नेटवर्क परत

मैं एक कठिन समय figu आ रही द्वारा रद्द कर दिया गया एक पढ़ने/लिखने के धागे में संचार को विभाजित किए बिना ऐसा करने का सबसे अच्छा तरीका रिंग करें (जिससे जटिलता में काफी वृद्धि हो)। यहाँ मैं कैसे 4 में से 3 प्राप्त कर सकते हैं:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. 
stream.ReadTimeout = 10000; 
await stream.ReadAsync(buf, 0, 1, cts.Token); 

या

BlockingCollection<byte[]> SendQueue = new ...; 
... 
// Check for a message from network layer. Timeout after 10 seconds. 
// Monitor cancellation token. 
SendQueue.TryTake(out msg, 10000, cts.Token); 

मैं सभी चार स्थितियों के लिए इंतजार कर रहे धागा ब्लॉक करने के लिए, क्या करना चाहिए? सभी सिफारिशों का स्वागत है। मैं किसी भी वास्तुकला या डेटा संरचनाओं पर सेट नहीं हूँ।

संपादित करें: ******** सभी की सहायता के लिए धन्यवाद। यहां मेरा समाधान है ********

सबसे पहले मुझे नहीं लगता कि निर्माता/उपभोक्ता कतार का एक असीमित कार्यान्वयन था। तो मैंने this stackoverflow post के समान कुछ लागू किया।

मुझे उपभोक्ता धागे को रोकने और क्रमशः similar to this article को मध्यवर्ती कार्यों को रद्द करने के लिए बाहरी और आंतरिक रद्दीकरण स्रोत की आवश्यकता थी।

byte[] buf = new byte[1]; 
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) 
{ 
    CancellationToken internalToken = internalTokenSource.Token; 
    CancellationToken stopToken = stopTokenSource.Token; 
    using (CancellationTokenSource linkedCts = 
     CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)) 
    { 
     CancellationToken ct = linkedCts.Token; 
     Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct); 
     Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); 
     Task keepAliveTask = Task.Delay(m_keepAliveTime, ct); 

     // Wait for at least one task to complete 
     await Task.WhenAny(readTask, msgTask, keepAliveTask); 

     // Next cancel the other tasks 
     internalTokenSource.Cancel(); 
     try { 
      await Task.WhenAll(readTask, msgTask, keepAliveTask); 
     } catch (OperationCanceledException e) { 
      if (e.CancellationToken == stopToken) 
       throw; 
     } 

     if (msgTask.IsCompleted) 
      // Send the network layer message 
     else if (readTask.IsCompleted) 
      // Process the byte from the physical layer 
     else 
      Contract.Assert(keepAliveTask.IsCompleted); 
      // Send a keep alive message 
    } 
} 
+3

['कार्य का इंतजार करें। जब कोई (...)'] (https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx) मदद हो सकती है। –

उत्तर

2

इस मामले में, मैं केवल रद्द करने के लिए रद्द करने टोकन का प्रयोग करेंगे। एक जीवित टाइमर की तरह दोहराया गया टाइमआउट बेहतर टाइमर के रूप में दर्शाया जाता है।

तो, मैं इसे तीन रद्द करने योग्य कार्यों के रूप में मॉडल करूंगा। सबसे पहले, रद्द टोकन:

सभी संचार नेटवर्क परत द्वारा रद्द कर दिया गया

CancellationToken token = ...; 

फिर, तीन समवर्ती आपरेशनों:

एक बाइट प्राप्त होता है

var readByteTask = stream.ReadAsync(buf, 0, 1, token); 

रखें जिंदा टाइमर

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 

एक संदेश यह एक थोड़ा जटिल काम है नेटवर्क धागा

से उपलब्ध है समाप्त हो गया है। आपका वर्तमान कोड BlockingCollection<T> का उपयोग करता है, जो एसिंक-संगत नहीं है। मैं TPL Dataflow's BufferBlock<T> या my own AsyncProducerConsumerQueue<T> पर स्विच करने की अनुशंसा करता हूं, इनमें से कोई भी एसिंक-संगत निर्माता/उपभोक्ता कतार के रूप में उपयोग किया जा सकता है (जिसका अर्थ है कि निर्माता सिंक या एसिंक हो सकता है, और उपभोक्ता सिंक या एसिंक हो सकता है)।

BufferBlock<byte[]> SendQueue = new ...; 
... 
var messageTask = SendQueue.ReceiveAsync(token); 

तो फिर तुम Task.WhenAny उपयोग कर सकते हैं यह निर्धारित करने के लिए इन कार्यों के पूरा:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask); 

अब, आप उन्हें ing दूसरों के लिए completedTask और await की तुलना द्वारा परिणाम प्राप्त कर सकते हैं:

if (completedTask == readByteTask) 
{ 
    // Throw an exception if there was a read error or cancellation. 
    await readByteTask; 
    var byte = buf[0]; 
    ... 
    // Continue reading 
    readByteTask = stream.ReadAsync(buf, 0, 1, token); 
} 
else if (completedTask == keepAliveTimerTask) 
{ 
    // Throw an exception if there was a cancellation. 
    await keepAliveTimerTask; 
    ... 
    // Restart keepalive timer. 
    keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 
} 
else if (completedTask == messageTask) 
{ 
    // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) 
    byte[] message = await messageTask; 
    ... 
    // Continue reading 
    messageTask = SendQueue.ReceiveAsync(token); 
} 
+0

इच्छा है कि मैंने इसे पहले देखा था और अपने स्वयं के AsyncQueue को लागू किया था, यद्यपि आपके जैसा सामान्य नहीं था। आप प्रत्येक व्यक्तिगत कार्य का इंतजार क्यों करते हैं? क्या यह पहले ही पूरा नहीं होना चाहिए? –

+0

@BrianHeilig: परिणाम पुनर्प्राप्त करना आवश्यक है। ये कार्य पूरा हो गए हैं लेकिन उनके परिणाम नहीं देखे गए हैं। परिणाम डेटा हो सकते हैं, उदाहरण के लिए, 'messageTask', लेकिन परिणाम अपवाद भी हो सकते हैं। 'keepAliveTimerTask' में अपवाद हो सकता है यदि इसे रद्द कर दिया गया है, और' readByteTask' में अपवाद हो सकता है यदि यह रद्द हो गया है या यदि कुछ I/O पढ़ने में त्रुटि है। उपस्थित होने पर 'इंतजार' करेंगे (पुनः-) उन अपवादों को उठाएं। –

1

एक पढ़ने वाली पत्तियों को रद्द करना आपको यह जानने का कोई तरीका नहीं है कि डेटा पढ़ा गया था या नहीं। रद्द करना और पढ़ना एक-दूसरे के संबंध में परमाणु नहीं है। यह दृष्टिकोण केवल तभी काम करता है जब आप रद्दीकरण के बाद स्ट्रीम बंद करते हैं।

कतार दृष्टिकोण बेहतर है। आप एक लिंक CancellationTokenSource बना सकते हैं जो जब चाहें रद्द हो जाता है। cts.Token पास करने के बजाय आप एक टोकन पास करते हैं जिसे आप नियंत्रित करते हैं।

तब आप उस टोकन को समय, एक और टोकन और किसी अन्य घटना के आधार पर सिग्नल कर सकते हैं। यदि आप अंतर्निहित टाइमआउट का उपयोग करते हैं तो कतार आंतरिक रूप से एक ही समय के साथ आने वाली टोकन को जोड़ने के लिए वही काम करेगी।

+0

क्षमा करें, मैं विस्तार से थोड़ा छोटा था। सीटीएस रद्दीकरण टोकनसोर्स है जो डेटा लिंक परत के स्वामित्व में है। रद्दीकरण डेटा लिंक परत द्वारा भी नियंत्रित किया जाता है; नेटवर्क थ्रेड कॉल रोकता है जो सभी संचार को रद्द करता है और धागे को पूरा करने की प्रतीक्षा करता है। लक्ष्य डेटा लिंक परत को गहराई से और जल्दी से बंद करना है। –

+1

और यह उत्तर क्यों काम नहीं करेगा? आप अपनी पसंद की किसी भी * शर्त पर 'TryTake' निरस्त कर सकते हैं। – usr

+0

मुझे लगता है कि मैं समझता हूं। आप सुझाव दे रहे हैं कि जब नेटवर्क थ्रेड रद्द हो जाता है या बाइट उपलब्ध होता है तो मैं रद्द टोकन को सिग्नल करता हूं, है ना? मुझे लगता है कि स्ट्रीम से सिंक्रनाइज़ रूप से पढ़ने के लिए मुझे एक और थ्रेड की आवश्यकता होगी, फिर पढ़े जाने पर रद्द टोकन को सिग्नल करें। मुझे यह निर्धारित करने के लिए एक चर की आवश्यकता होगी कि क्या रद्द किया गया था (पढ़ें या रोकें)। संदेश लेने के बाद मुझे पढ़ने के थ्रेड को रद्द करने की भी आवश्यकता होगी, है ना? या क्या मैं कुछ न कुछ भूल रहा हूं? –

3

मैं आपके विकल्प दो के साथ जाऊंगा, 4 स्थितियों में से किसी एक के लिए इंतजार कर रहा हूं। आप यह मानते हुए है पहले से ही awaitable तरीके के रूप में 4 कार्यों:

var task1 = WaitForByteReceivedAsync(); 
var task2 = WaitForMessageAvailableAsync(); 
var task3 = WaitForKeepAliveTimerAsync(); 
var task4 = WaitForCommunicationCancelledAsync(); 

// now gather them 
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{ 
task1, task2, task3, task4 
}; 

// Wait for any of the things to complete 
var result = await Task.WhenAny(theTasks); 

कोड ऊपर तुरंत फिर से शुरू होगा के बाद पहले कार्य को पूरा करता है, और अन्य 3 ध्यान न दें।

नोट:

the documentation for WhenAny में, यह कहते हैं:

लौटे काम हमेशा पूरा करने के लिए पहला काम करने के लिए सेट अपने परिणाम के साथ RanToCompletion राज्य में खत्म हो जाएगा। यह सही है भले ही रद्द करने या गलत तरीके से समाप्त होने वाला पहला कार्य पूरा हो।

तो भरोसा करने से पहले क्या हुआ कि अंतिम जाँच करना सुनिश्चित करें:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns 
+1

मुझे लगता है कि आपका मतलब था जब कोई() –

+1

नहीं। जब कोई भी कार्य वापस लौटाता है तो कोई भी कार्य वापस लौटाता है। जब पूरा करने के लिए दिए गए प्रत्येक कार्य के लिए सभी इंतजार कर रहे हैं, और खुद को एक कार्य (जैसा कि WaitAll() –

+1

के विपरीत है, यह काम नहीं करता है क्योंकि बाइट पढ़ने के परिणाम को त्याग दिया जा सकता है। मुझे लगता है कि वह उस डेटा को खोने के बारे में परवाह नहीं करता है। – usr