मैं डेटाफ्लो पाइपलाइन में पोस्ट की गई वस्तुओं की संख्या को सीमित करना चाहता हूं। वस्तुओं की संख्या उत्पादन पर्यावरण पर निर्भर करती है। ये ऑब्जेक्ट्स बड़ी मात्रा में मेमोरी (छवियों) का उपभोग करते हैं, इसलिए जब मैं पाइपलाइन के आखिरी ब्लॉक ने अपना काम किया है तो मैं उन्हें पोस्ट करना चाहता हूं।टीपीएल डेटाफ्लो: एक संपूर्ण पाइपलाइन कैसे थ्रॉटल करें?
मैंने निर्माता को थ्रॉटल करने और पाइपलाइन के अंतिम ब्लॉक में इसे रिलीज़ करने के लिए SemaphoreSlim
का उपयोग करने का प्रयास किया। यह काम करता है, लेकिन यदि प्रक्रिया के दौरान कोई अपवाद उठाया जाता है, तो प्रोग्राम हमेशा के लिए इंतजार कर रहा है और अपवाद को अवरुद्ध नहीं किया जाता है।
यहां एक नमूना है जो हमारे कोड की तरह दिखता है। मैं यह कैसे कर सकता हूं?
static void Main(string[] args)
{
SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);
var downloadString = new TransformBlock<string, string>(uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return new WebClient().DownloadString(uri);
});
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
char[] tokens = text.ToArray();
for (int i = 0; i < tokens.Length; i++)
{
if (!char.IsLetter(tokens[i]))
tokens[i] = ' ';
}
text = new string(tokens);
return text.Split(new char[] { ' ' },
StringSplitOptions.RemoveEmptyEntries);
});
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
throw new InvalidOperationException("ouch !"); // explicit for test
return words.Where(word => word.Length > 3).OrderBy(word => word)
.Distinct().ToArray();
});
var findPalindromes = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Finding palindromes...");
var palindromes = new ConcurrentQueue<string>();
Parallel.ForEach(words, word =>
{
string reverse = new string(word.Reverse().ToArray());
if (Array.BinarySearch<string>(words, reverse) >= 0 &&
word != reverse)
{
palindromes.Enqueue(word);
}
});
return palindromes.ToArray();
});
var printPalindrome = new ActionBlock<string[]>(palindromes =>
{
try
{
foreach (string palindrome in palindromes)
{
Console.WriteLine("Found palindrome {0}/{1}",
palindrome, new string(palindrome.Reverse().ToArray()));
}
}
finally
{
semaphore.Release();
}
});
downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findPalindromes);
findPalindromes.LinkTo(printPalindrome);
downloadString.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)createWordList).Fault(t.Exception);
else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)filterWordList).Fault(t.Exception);
else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
else findPalindromes.Complete();
});
findPalindromes.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
else printPalindrome.Complete();
});
try
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
downloadString.Post("http://www.google.com");
semaphore.Wait(); // waits here when an exception throws
}
downloadString.Complete();
printPalindrome.Completion.Wait();
}
catch (AggregateException agg)
{
Console.WriteLine("An error has occured : " + agg);
}
Console.WriteLine("Done");
Console.ReadKey();
}
धन्यवाद, यह हिस्सा बहुत अच्छा काम करता है। हालांकि, लूप दो फर्स्ट ब्लॉक में वस्तुओं का उत्पादन जारी रखता है जिन्हें दोषपूर्ण नहीं माना जाता है। अगर मैं कोड के इस भागों modifiy: 'findPalindromes.Completion.ContinueWith (टी => { अगर (t.IsFaulted) { ((IDataflowBlock) printPalindrome) .Fault (t.Exception); ((IDataflowBlock) डाउनलोडस्ट्रिंग) .फॉल्ट (टी। एक्सेप्शन); // पहले ब्लॉक को चिह्नित करें } अन्य प्रिंटपॉलिड्रोम। पूर्ण(); }); यह काम करता है। लेकिन मुझे यकीन नहीं है कि ऐसा करने का यह बेहतर तरीका है। – n3bula
इस मार्ग पर नहीं जा रहा है बस कोड को तुल्यकालिक रूप से चलाएं क्योंकि यह केवल मुख्य धागे को प्रतीक्षा करने के लिए कह रहा है? – moarboilerplate
@ n3bula आप केवल यह जांच सकते हैं कि पूरा करने का कार्य पूरा हो गया है या नहीं। मेरे अपडेट को देखो। – i3arnon