2010-06-13 7 views
5

के साथ IObservable/IObserver का उपयोग कैसे करें मुझे एहसास हुआ कि जब मैं एकाधिक धागे का उपयोग करके एक समवर्ती कतार में आइटम्स को संसाधित करने का प्रयास कर रहा हूं, जबकि एकाधिक धागे इसमें आइटम डाल सकते हैं, आदर्श समाधान प्रतिक्रियाशील एक्सटेंशन का उपयोग करना होगा समवर्ती डेटा संरचनाओं के साथ।ConcurrentQueue या ConcurrentStack

While using ConcurrentQueue, trying to dequeue while looping through in parallel

तो मैं उत्सुक हूँ अगर वहाँ एक LINQ (या PLINQ) क्वेरी कि लगातार के रूप में आइटम इसमें लगाया जाता है dequeueing किया जाएगा करने के लिए किसी भी तरह से है:

मेरे मूल प्रश्न पर है।

मैं इसे इस तरह से काम करने की कोशिश कर रहा हूं जहां मेरे पास कतार में धक्का देने वाले उत्पादकों की संख्या और सीमित संख्या में धागे की प्रक्रिया हो सकती है, इसलिए मैं डेटाबेस को अधिभारित नहीं करता हूं।

यदि मैं आरएक्स फ्रेमवर्क का उपयोग कर सकता हूं तो मुझे उम्मीद है कि मैं इसे शुरू कर सकता हूं, और 100 आइटम के भीतर 100 आइटम रखे जाते हैं, तो 20 थ्रेड जो PLINQ क्वेरी का हिस्सा हैं, केवल कतार के माध्यम से प्रक्रिया करेंगे।

तीन प्रौद्योगिकियों मैं एक साथ काम करने की कोशिश कर रहा हूँ रहे हैं:

  1. आरएक्स फ्रेमवर्क (रिएक्टिव LINQ)
  2. pling
  3. System.Collections.Concurrent संरचनाओं
+0

क्या आप विस्तार से आरएक्स की मदद करने के लिए कैसे उम्मीद कर सकते हैं? –

+0

@ रिचर्ड Szalay - जैसा कि मैंने अंत के अंत में उल्लेख किया है, मेरा विचार यह है कि मुझे यह देखने के लिए मतदान नहीं करना है कि कतार में कुछ भी है या नहीं, मैं वहां प्रतिक्रिया कर सकता हूं जब वहां कुछ रखा जाता है, तो यदि बड़ी संख्या में आइटम अचानक धक्का दिया जाता है मैं प्रसंस्करण कर कई धागे हो सकता है। मैं मतदान से बचने की कोशिश कर रहा हूं, जो मैं अभी कर रहा हूं। –

उत्तर

3

मैं डॉन ' टी यह नहीं जानता कि आरएक्स के साथ इसे कैसे पूरा किया जाए, लेकिन मैं केवल BlockingCollection<T> और producer-consumer pattern का उपयोग करने की अनुशंसा करता हूं। आपका मुख्य धागा संग्रह में आइटम जोड़ता है, जो डिफ़ॉल्ट रूप से ConcurrentQueue<T> का उपयोग करता है। फिर आपके पास अलग-अलग Task है जो आप Parallel::ForEach का उपयोग BlockingCollection<T> पर संग्रह से कई आइटमों को संसाधित करने के लिए करते हैं, जो सिस्टम के साथ समेकित रूप से समझ में आता है। अब, आप शायद अधिक कुशल होने के लिए समानांतर एक्सटेंशन लाइब्रेरी की GetConsumingPartitioner विधि का उपयोग करना चाहेंगे क्योंकि डिफ़ॉल्ट विभाजनकर्ता इस मामले में आप चाहते हैं उससे अधिक ओवरहेड बनाएगा। आप this blog post से इसके बारे में अधिक पढ़ सकते हैं।

जब मुख्य थ्रेड समाप्त हो गया है तुम पर BlockingCollection<T> और Task::Wait पर CompleteAdding फोन Task आप संग्रह में सभी आइटम को संसाधित करना समाप्त करने के लिए सभी उपभोक्ताओं के लिए प्रतीक्षा करने के लिए काता।

+0

'ब्लॉकिंग कोलेक्शन' का उपयोग करने के लिए मुख्य पकड़ यह है कि उपभोग करने वाले थ्रेड ब्लॉक। जब एक प्रक्रिया करने के लिए कुछ था तो एक अवलोकन पैटर्न केवल थ्रेड ले जाएगा। –

6

ड्रू सही है, मुझे लगता है कि यह ConcurrentQueue है, भले ही यह नौकरी के लिए सही लगता है वास्तव में अंतर्निहित डेटा संरचना है जो अवरोधक चयन का उपयोग करता है। मेरे सामने बहुत पीछे लगता है। इस पुस्तक के अध्याय 7 को देखें * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 और यह बताएगा कि ब्लॉकिंग कोलेक्शन का उपयोग कैसे करें और इसमें कई उत्पादक और एकाधिक उपभोक्ता "कतार" लेते हैं। आप "GetConsumingEnumerable()" विधि को देखना चाहेंगे और संभवतः बस उस पर कॉल करें। ToObservable()।

* शेष पुस्तक काफी औसत है।

संपादित करें:

यहां एक नमूना कार्यक्रम मुझे लगता है कि आप क्या चाहते हैं करता है?

class Program 
{ 
    private static ManualResetEvent _mre = new ManualResetEvent(false); 
    static void Main(string[] args) 
    { 
     var theQueue = new BlockingCollection<string>(); 
     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000)); 


     LoadQueue(theQueue, "Producer A"); 
     LoadQueue(theQueue, "Producer B"); 
     LoadQueue(theQueue, "Producer C"); 

     _mre.Set(); 

     Console.WriteLine("Processing now...."); 

     Console.ReadLine(); 
    } 

    private static void ProcessNewValue(string value, string consumerName, int delay) 
    { 
     Thread.SpinWait(delay); 
     Console.WriteLine("{1} consuming {0}", value, consumerName); 
    } 

    private static void LoadQueue(BlockingCollection<string> target, string prefix) 
    { 
     var thread = new Thread(() => 
            { 
             _mre.WaitOne(); 
             for (int i = 0; i < 100; i++) 
             { 
              target.Add(string.Format("{0} {1}", prefix, i)); 
             } 
            }); 
     thread.Start(); 
    } 
} 
+0

यह वास्तव में .... सरल आदमी है ... एक ब्लॉकिंग कोलेक्शन के साथ आरएक्स को जोड़ रहा है। वाह .. आप इस चीज़ के साथ एक पाइपलाइन भी कर सकते हैं: https://msdn.microsoft.com/en-us/library/ff963548.aspx – Oooogi

संबंधित मुद्दे