2012-02-10 10 views
5

में टीपीएल का उपयोग करके निरंतर प्रसंस्करण "फ्लो" कैसे बना सकता हूं, मुझे यकीन नहीं है कि निम्नलिखित संभव है, लेकिन मैं पैरालेल में कई कार्रवाइयों को एक थ्रॉटल तरीके से आमंत्रित करना चाहता हूं, लेकिन टाइमर या लूप/नींद चक्रों का उपयोग करने के लिए वापस लौटने के बिना निरंतर प्रसंस्करण का प्रवाह रखें।मैं सी # 4

अब तक मुझे यह काम मिल गया है कि यह कुछ स्रोतों से इनपुट का एक बड़ा बैच लोड करता है ... और फिर उन्हें नियंत्रित तरीके से पैरालेल में & लूपों को नीचे की तरह प्रक्रिया करता है।

static void Main(string[] args) 
{ 
    while(true) //Simulate a Timer Elapsing... 
    { 
     IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; 
     //Simulate querying database queue tables for next batch of entries 

     RunAllActions(inputs, 3); //Max 3 at a time. 
    } 
} 

static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency) 
{ 
    var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency}; 

    Parallel.ForEach<int>(inputs, options, DoWork); 
    //Blocks here until all inputs are processed. 
    Console.WriteLine("Batch of Work Done!!!"); 
} 

static void DoWork(int input) 
{ 
    Console.WriteLine("Starting Task {0}", input); 
    System.Threading.Thread.Sleep(3000); 
    Console.WriteLine("Finishing Task {0}", input); 
} 

मैं जानना चाहता है जो खोजना चाहते हैं, वहाँ है कि मैं यह हमेशा चालू रखने के लिए ... तो यह है कि मैं "टाइमर elapsing" & "डेटाबेस मतदान" की जगह ले सकता इस्तेमाल कर सकते हैं TPL में एक निर्माण है एक संदेश क्यूई प्राप्त घटना के साथ।

निम्नलिखित का एक मोटा संस्करण है जिसे मैं हासिल करना चाहता हूं ... अन्यथा मैं इसके बारे में जा सकता हूं, लेकिन मैं जानना चाहता हूं कि इस प्रकार का पैटर्न टीपीएल में बनाया गया है।

internal class Engine 
{ 
    private MessageQueue mq; 
    private Queue<int> myInternalApplicationQueue; 

    public Engine() 
    { 
     //Message Queue to get new task inputs from 
     mq = new MessageQueue(); 
     mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted); 

     // internal Queue to put them in. 
     myInternalApplicationQueue = new Queue<int>(); 
    } 

    void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e) 
    { 
     //On MQ Receive, pop the input in a queue in my app 
     int input = (int) e.Message.Body; 

     myInternalApplicationQueue.Enqueue(input); 
    } 

    public void StartWorking() 
    { 
     //Once this gets called, it doesn't stop... it just keeps processing/watching that queue 
     //processing the tasks as fast as it's allowed while the app is running. 
     var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 }; 
     Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork); 
     //  ^^^^^^^^^^^^^^^^^^ <----- THIS GUY 
    } 

} 

उत्तर

5

आप आपरेशन के इस प्रकार है, जो प्रभावी रूप से एक निर्माता/उपभोक्ता परिदृश्य है संभाल करने के लिए BlockingCollection<T> उपयोग कर सकते हैं।

असल में, आप BlockingCollection<T> सेट अप करेंगे और इसे अपने "निर्माता" के रूप में उपयोग करेंगे। इसके बाद आपके पास उपभोक्ता कार्य (जो प्रायः लंबे समय तक चलने वाले कार्यों के रूप में सेट होते हैं) के तीन (या किसी भी संख्या) होते हैं जो तत्वों को संसाधित करते हैं (blockingCollection.GetConsumingEnumerable() को मानक फ़ोरैच लूप में कॉल करके)।

फिर आप संग्रह में आवश्यक वस्तुओं को जोड़ते हैं, और वे लगातार संसाधित होंगे। जब आप पूरी तरह से पूरा कर लेंगे, तो आप BlockingCollection<T>.CompleteAdding पर कॉल करेंगे, जो फोरैच लूप को पूरा करने का कारण बनेंगे, और पूरी चीज बंद हो जाएगी।

एक तरफ ध्यान दें के रूप में - आप आम तौर पर एक BlockingCollection<T> से GetConsumingEnumerable() पर Parallel.ForEach का उपयोग नहीं करना चाहते हैं - कम से कम जब तक नहीं आप विभाजन अपने आप को संभाल। यह आमतौर पर कई कार्यों का उपयोग करना बेहतर होता है और अनुक्रमिक रूप से प्रत्येक पुनरावृत्त होता है। इसका कारण यह है कि Parallel.ForEach में डिफ़ॉल्ट विभाजन योजना समस्याएं पैदा करेगी (यह तब तक प्रतीक्षा करती है जब तक डेटा की "खंड" उपलब्ध न हो, तुरंत वस्तुओं को प्रोसेस करने की बजाय, और "भाग" समय के साथ बड़े और बड़े हो जाते हैं)।

+0

धन्यवाद रीड। यह कोशिश करेगा। –

2

रीड पॉइंट आउट होने के कारण, ब्लॉकिंग कोलेक्शन यहां जाने का एक अच्छा "मैनुअल" तरीका है। नकारात्मकता यह है कि आपको उपभोक्ताओं को भी प्रबंधित करना होगा।

एक अन्य दृष्टिकोण जिसे आप देखना चाहते हैं, इस तरह के परिदृश्यों के लिए आपके हाथों से बहुत अधिक समन्वय कार्य लेता है TPL Dataflow में देखना है। विशेष रूप से इस तरह के परिदृश्य में आप केवल ActionBlock<T> का उपयोग कर सकते हैं और जब संदेश कतार से आता है तो आप PostActionBlock<T> पर डेटा का नया टुकड़ा करेंगे और यह स्वचालित रूप से कवर के तहत टीपीएल कार्यकर्ता धागे का उपयोग करके इसे संसाधित करेगा। आप आसान एक ExecutionDataflowBlockOptions में गुजर जब ActionBlock<T> का निर्माण करके ActionBlock<T> के इन बारीकियों को नियंत्रित कर सकते जहाँ तक नियंत्रित करने समानांतरवाद या क्षमता के रूप में अब

ActionBlock<int> myActionBlock = new ActionBlock<int>(this.ProcessWorkItem); 

void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)  
{  
    int input = (int)e.Message.Body; 

    // Post the data to the action block 
    this.myActionBlock.Post(input); 
} 

private void ProcessWorkItem(int workItemData) 
{ 
    // ActionBlock will hand each work item to this method for processing 
} 

,,: यह अपने Engine वर्ग इस तरह एक छोटे से कुछ इस प्रकार दिखाई होगा। तो मान लीजिए कि मैं यह सुनिश्चित करना चाहता हूं कि मेरे पास चार से अधिक की समांतरता न हो और निर्माता को एक सौ से अधिक आइटम कतार में जोड़ने से रोकें।मैं बस करूँगा:

ActionBlock<int> myActionBlock = new ActionBlock<int>(
            this.ProcessWorkItem, 
            new ExecutionDataflowBlockOptions 
            { 
             MaxDegreeOfParallelism = 4, 
             BoundedCapacity = 100 
            }); 
+0

धन्यवाद ड्रू। यह वास्तव में दिलचस्प लग रहा है। दुर्भाग्य से मैं अब .NET 4 या उससे कम के लिए बंद कर दिया गया है। –

+0

जबकि टीडीएफ को .NET 4.5 में बेक किया गया है, टीडीएफ का एक अलग डाउनलोड है जो सिर्फ .NET4 TPL के शीर्ष पर बैठता है। यह मेरे उत्तर में लिंक किए गए टीपीएल डेटाफ्लो मुख्य पृष्ठ से उपलब्ध है, लेकिन यहां प्रत्यक्ष डाउनलोड लिंक है (http://download.microsoft.com/download/F/9/6/F967673D-58D6-4E3F-8CA9-11769A0A63B1/TPLDataflow .msi) और यहां रीडेमे (http://msdn.microsoft.com/en-us/devlabs/gg585583) –

+0

चीयर्स ड्रू है। हाँ मैंने इसे अपने स्वयं के टेस्ट वीएम पर डाउनलोड किया है। लेकिन मैं जो काम कर रहा हूं उसका एक मौजूदा उत्पादन माहौल में पहुंचाया जाएगा और यही मेरा मतलब है .NET 4.0 पर लॉक किया जा रहा है। एक बार फिर धन्यवाद। –

संबंधित मुद्दे