8

का उपयोग कर एक समय में केवल एन आइटम को प्रोसेस करना यह सब एक विंडोज़ सेवा में हो रहा है।कार्य समांतर लाइब्रेरी

मेरे पास Queue<T> (वास्तव में ConcurrentQueue<T>) होल्डिंग आइटम संसाधित होने की प्रतीक्षा कर रहे हैं। लेकिन, मैं एक समय में केवल एक को संसाधित नहीं करना चाहता, मैं एक साथ एन वस्तुओं को संसाधित करना चाहता हूं, जहां n कॉन्फ़िगर करने योग्य पूर्णांक है।

कार्य समानांतर लाइब्रेरी का उपयोग करके मैं इसे करने के बारे में कैसे जा सकता हूं?

मुझे पता है कि टीपीएल समवर्ती प्रसंस्करण के लिए डेवलपर की ओर से संग्रह विभाजन करेगा, लेकिन यह सुनिश्चित नहीं है कि यह सुविधा मेरे बाद है या नहीं। मैं multithreading और टीपीएल के लिए नया हूँ।

उत्तर

4

उपयोग BlockingCollection<T>ConcurrentQueue<T> के बजाय, तो आप उपभोक्ता धागे के किसी भी संख्या शुरू करने और BlockingCollection की Take विधि का उपयोग कर सकते हैं। यदि संग्रह खाली है, तो Take विधि आइटम कॉल करने के लिए प्रतीक्षा करने वाले कॉलर थ्रेड में स्वचालित रूप से अवरुद्ध हो जाएगी, अन्यथा धागे समानांतर में सभी कतार वस्तुओं का उपभोग करेंगे। हालांकि, जैसा कि आपके प्रश्न ने टीपीएल के उपयोग का उल्लेख किया है, यह पता चला है कि Parallel.ForEach में BlockingCollection के साथ उपयोग करते समय कुछ समस्याएं हैं और अधिक जानकारी के लिए this पोस्ट देखें। इसलिए आपको अपने उपभोक्ता धागे को अपने स्वयं के निर्माण का प्रबंधन करना होगा। new Thread(/*consumer method*/) या new Task() ...

+0

अवरोधक चयन कतार के उद्देश्य को हरा देता है। जब मैं इसे चालू कर रहा हूं, तो मैं किसी अवरुद्ध संग्रह से किसी आइटम को नहीं हटा सकता। –

+2

इसके बजाय आप इसके [GetConsumingEnumerable] (http://msdn.microsoft.com/en-us/library/dd287186.aspx) का उपयोग करके कर सकते हैं। जैसे, 'foreach (_collection.GetConsumingEnumerable() में आइटम आइटम)' संग्रह खाली होने पर आइटम को जोड़ने के लिए प्रतीक्षा भी किया जाएगा। –

4

यहां एक विचार है जिसमें TaskFactory के लिए एक विस्तार विधि बनाना शामिल है।

public static class TaskFactoryExtension 
{ 
    public static Task StartNew(this TaskFactory target, Action action, int parallelism) 
    { 
     var tasks = new Task[parallelism]; 
     for (int i = 0; i < parallelism; i++) 
     { 
      tasks[i] = target.StartNew(action); 
     } 
     return target.StartNew(() => Task.WaitAll(tasks)); 
    } 
} 

फिर आपका कॉलिंग कोड निम्न जैसा दिखाई देगा।

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
var task = Task.Factory.StartNew(
() => 
    { 
    T item; 
    while (queue.TryDequeue(out item)) 
    { 
     ProcessItem(item); 
    } 
    }, n); 
task.Wait(); // Optionally wait for everything to finish. 

Parallel.ForEach का उपयोग करके एक और विचार है। इस दृष्टिकोण के साथ समस्या यह है कि समांतरता की आपकी डिग्री को सम्मानित नहीं किया जा सकता है। आप केवल अधिकतम राशि को इंगित कर रहे हैं न कि पूर्ण राशि।

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n }, 
    (item) => 
    { 
    ProcessItem(item);  
    }); 
1

मैं भी सीधे एक ConcurrentQueue उपयोग करने का एक BlockingCollection का उपयोग कर की सलाह देते हैं बजाय।

यहाँ एक उदाहरण है:

public class QueuingRequestProcessor 
{ 
    private BlockingCollection<MyRequestType> queue; 

    public void QueuingRequestProcessor(int maxConcurrent) 
    { 
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent); 

    Task[] consumers = new Task[maxConcurrent]; 

    for (int i = 0; i < maxConcurrent; i++) 
    { 
     consumers[i] = Task.Factory.StartNew(() => 
     { 
     // Will wait when queue is empty, until CompleteAdding() is called 
     foreach (var request in this.queue.GetConsumingEnumerable()) 
     { 
      Process(request); 
     } 
     }); 
    } 
    } 

    public void Add(MyRequest request) 
    { 
    this.queue.Add(request); 
    } 

    public void Stop() 
    { 
    this.queue.CompleteAdding(); 
    } 

    private void Process(MyRequestType request) 
    { 
    // Do your processing here 
    } 
} 

नोट निर्माता में maxConcurrent निर्धारित करती है कि कई अनुरोध समवर्ती कार्रवाई की जाएगी।

संबंधित मुद्दे