2015-03-03 8 views
5

मैं डेटाफ्लो पाइपलाइन में पोस्ट की गई वस्तुओं की संख्या को सीमित करना चाहता हूं। वस्तुओं की संख्या उत्पादन पर्यावरण पर निर्भर करती है। ये ऑब्जेक्ट्स बड़ी मात्रा में मेमोरी (छवियों) का उपभोग करते हैं, इसलिए जब मैं पाइपलाइन के आखिरी ब्लॉक ने अपना काम किया है तो मैं उन्हें पोस्ट करना चाहता हूं।टीपीएल डेटाफ्लो: एक संपूर्ण पाइपलाइन कैसे थ्रॉटल करें?

मैंने निर्माता को थ्रॉटल करने और पाइपलाइन के अंतिम ब्लॉक में इसे रिलीज़ करने के लिए SemaphoreSlim का उपयोग करने का प्रयास किया। यह काम करता है, लेकिन यदि प्रक्रिया के दौरान कोई अपवाद उठाया जाता है, तो प्रोग्राम हमेशा के लिए इंतजार कर रहा है और अपवाद को अवरुद्ध नहीं किया जाता है।

यहां एक नमूना है जो हमारे कोड की तरह दिखता है। मैं यह कैसे कर सकता हूं?

static void Main(string[] args) 
{ 
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2); 

    var downloadString = new TransformBlock<string, string>(uri => 
    { 
     Console.WriteLine("Downloading '{0}'...", uri); 
     return new WebClient().DownloadString(uri); 
    }); 

    var createWordList = new TransformBlock<string, string[]>(text => 
    { 
     Console.WriteLine("Creating word list..."); 

     char[] tokens = text.ToArray(); 
     for (int i = 0; i < tokens.Length; i++) 
     { 
      if (!char.IsLetter(tokens[i])) 
       tokens[i] = ' '; 
     } 
     text = new string(tokens); 

     return text.Split(new char[] { ' ' }, 
      StringSplitOptions.RemoveEmptyEntries); 
    }); 

    var filterWordList = new TransformBlock<string[], string[]>(words => 
    { 
     Console.WriteLine("Filtering word list..."); 
     throw new InvalidOperationException("ouch !"); // explicit for test 
     return words.Where(word => word.Length > 3).OrderBy(word => word) 
      .Distinct().ToArray(); 
    }); 

    var findPalindromes = new TransformBlock<string[], string[]>(words => 
    { 
     Console.WriteLine("Finding palindromes..."); 

     var palindromes = new ConcurrentQueue<string>(); 

     Parallel.ForEach(words, word => 
     { 
      string reverse = new string(word.Reverse().ToArray()); 

      if (Array.BinarySearch<string>(words, reverse) >= 0 && 
       word != reverse) 
      { 
       palindromes.Enqueue(word); 
      } 
     }); 

     return palindromes.ToArray(); 
    }); 

    var printPalindrome = new ActionBlock<string[]>(palindromes => 
    { 
     try 
     { 
      foreach (string palindrome in palindromes) 
      { 
       Console.WriteLine("Found palindrome {0}/{1}", 
        palindrome, new string(palindrome.Reverse().ToArray())); 
      } 
     } 
     finally 
     { 
      semaphore.Release(); 
     } 
    }); 

    downloadString.LinkTo(createWordList); 
    createWordList.LinkTo(filterWordList); 
    filterWordList.LinkTo(findPalindromes); 
    findPalindromes.LinkTo(printPalindrome); 


    downloadString.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)createWordList).Fault(t.Exception); 
     else createWordList.Complete(); 
    }); 
    createWordList.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)filterWordList).Fault(t.Exception); 
     else filterWordList.Complete(); 
    }); 
    filterWordList.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws 
     else findPalindromes.Complete(); 
    }); 
    findPalindromes.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched 
     else printPalindrome.Complete(); 
    }); 

    try 
    { 
     for (int i = 0; i < 10; i++) 
     { 
      Console.WriteLine(i); 

      downloadString.Post("http://www.google.com"); 
      semaphore.Wait(); // waits here when an exception throws 
     } 

     downloadString.Complete(); 

     printPalindrome.Completion.Wait(); 
    } 
    catch (AggregateException agg) 
    { 
     Console.WriteLine("An error has occured : " + agg); 
    } 
    Console.WriteLine("Done"); 
    Console.ReadKey(); 
} 

उत्तर

2

आपको बस सेमेफोर और समापन कार्य दोनों पर एक साथ इंतजार करना चाहिए। इस तरह यदि ब्लॉक समय-समय पर समाप्त होता है (या तो अपवाद या रद्दीकरण से) तो अपवाद को फिर से चलाया जाएगा और यदि नहीं, तो आप अधिक से अधिक पोस्ट करने के लिए कमरे तक अपने सेमफोर पर इंतजार करेंगे।

आपको लगता है कि Task.WhenAny और SemaphoreSlim.WaitAsync के साथ क्या कर सकते हैं:

for (int i = 0; i < 10; i++) 
{ 
    Console.WriteLine(i); 
    downloadString.Post("http://www.google.com"); 

    if (printPalindrome.Completion.IsCompleted) 
    { 
     break; 
    } 

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait(); 
} 

नोट: Task.Wait का उपयोग कर इस मामले में केवल उचित है के रूप में यह Main है। आमतौर पर यह async विधि होना चाहिए और आपको awaitTask.WhenAny से वापस लौटा जाना चाहिए।

+0

धन्यवाद, यह हिस्सा बहुत अच्छा काम करता है। हालांकि, लूप दो फर्स्ट ब्लॉक में वस्तुओं का उत्पादन जारी रखता है जिन्हें दोषपूर्ण नहीं माना जाता है। अगर मैं कोड के इस भागों modifiy: 'findPalindromes.Completion.ContinueWith (टी => { अगर (t.IsFaulted) { ((IDataflowBlock) printPalindrome) .Fault (t.Exception); ((IDataflowBlock) डाउनलोडस्ट्रिंग) .फॉल्ट (टी। एक्सेप्शन); // पहले ब्लॉक को चिह्नित करें } अन्य प्रिंटपॉलिड्रोम। पूर्ण(); }); यह काम करता है। लेकिन मुझे यकीन नहीं है कि ऐसा करने का यह बेहतर तरीका है। – n3bula

+0

इस मार्ग पर नहीं जा रहा है बस कोड को तुल्यकालिक रूप से चलाएं क्योंकि यह केवल मुख्य धागे को प्रतीक्षा करने के लिए कह रहा है? – moarboilerplate

+0

@ n3bula आप केवल यह जांच सकते हैं कि पूरा करने का कार्य पूरा हो गया है या नहीं। मेरे अपडेट को देखो। – i3arnon

0

इस प्रकार मैंने थ्रॉटलिंग को संभाला या केवल किसी भी समय स्रोत ब्लॉक में 10 आइटमों को अनुमति दी। आप इसे संशोधित कर सकते हैं 1. सुनिश्चित करें कि आप पाइपलाइन में किसी अन्य ब्लॉक को भी थ्रॉटल करते हैं, अन्यथा, आप स्रोत ब्लॉक को 1 और अगले ब्लॉक के साथ बहुत अधिक प्राप्त कर सकते हैं।

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
     SingleProducerConstrained = true, 
     BoundedCapacity = 10 }); 

तो निर्माता इस करता है:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken); 

आप async उपयोग कर रहे हैं/इंतजार, बस SendAsync कॉल का इंतजार है।

संबंधित मुद्दे