2012-11-22 22 views
20

दोनों ट्रांसफॉर्मब्लॉक पूर्ण होने पर कोड को पूरा करने के कोड को फिर से कैसे लिख सकता है? मैंने सोचा कि पूरा होने का मतलब है कि इसे पूर्ण चिह्नित किया गया है और "आउट कतार" खाली है?टीपीएल डेटाफ्लो, केवल पूरा होने की गारंटी जब सभी स्रोत डेटा ब्लॉक पूरा हो गए

public Test() 
    { 
     broadCastBlock = new BroadcastBlock<int>(i => 
      { 
       return i; 
      }); 

     transformBlock1 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

     transformBlock2 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

     processorBlock = new ActionBlock<string>(i => 
      { 
       Console.WriteLine(i); 
      }); 

     //Linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    } 

    public void Start() 
    { 
     const int numElements = 100; 

     for (int i = 1; i <= numElements; i++) 
     { 
      broadCastBlock.SendAsync(i); 
     } 

     //mark completion 
     broadCastBlock.Complete(); 

     processorBlock.Completion.Wait(); 

     Console.WriteLine("Finished"); 
     Console.ReadLine(); 
    } 
} 

मैंने कोड को संपादित किया, प्रत्येक ट्रांसफॉर्म ब्लॉक के लिए इनपुट बफर गिनती जोड़ दी। स्पष्ट रूप से सभी 100 आइटम प्रत्येक ट्रांसफॉर्म ब्लॉक में स्ट्रीम किए जाते हैं। लेकिन जैसे ही ट्रांसफॉर्मब्लॉक्स में से एक प्रोसेसरब्लॉक खत्म करता है, कोई और आइटम स्वीकार नहीं करता है और इसके बजाय अपूर्ण ट्रांसफॉर्मब्लॉक का इनपुट बफर केवल इनपुट बफर को फ्लश करता है।

+0

सावधान रहें कि ट्रांसफ़ॉर्म ब्लॉक को प्रसारण ब्लॉक से सभी संदेश प्राप्त नहीं हो सकते हैं। उन्हें केवल _latest_ संदेश प्राप्त होता है। यदि प्रसारण ब्लॉक को ट्रांसफॉर्म ब्लॉक से तेज़ी से संदेशों की पेशकश की जाती है तो उन्हें प्राप्त कर सकते हैं, ट्रांसफॉर्म ब्लॉक संदेशों को याद करेगा। साथ ही, आपको 'SendAsync (i)' पर 'प्रतीक्षा' करना चाहिए यदि आप संदेश आदेश आदि सुनिश्चित करना चाहते हैं – urbanhusky

उत्तर

24

मुद्दा एकदम वही है जिसे casperOne में कहा कि उनकी जवाब। एक बार पहला ट्रांसफॉर्म ब्लॉक पूरा हो जाने पर, प्रोसेसर ब्लॉक "फिनिशिंग मोड" में जाता है: यह शेष आइटम को अपनी इनपुट कतार में संसाधित करेगा, लेकिन यह किसी भी नए आइटम को स्वीकार नहीं करेगा।

वहाँ दो में अपने प्रोसेसर ब्लॉक बंटवारे हालांकि तुलना में एक सरल ठीक है: PropagateCompletion निर्धारित नहीं करते हैं, लेकिन इसके बजाय मैन्युअल प्रोसेसर ब्लॉक के पूरा सेट जब दोनों ब्लॉकों पूरा परिणत:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion) 
    .ContinueWith(_ => processorBlock.Complete()); 
+0

वही जो मैं खोज रहा था। उस कार्य को अवगत नहीं था। जब कोई एक प्रतीक्षा योग्य कार्य, मेरी लापरवाही देता है। –

+0

मुझे बहुत कुछ चाहिए, शायद यह बहुत देर हो चुकी है, लेकिन क्या आप इस बारे में एक अपडेट पोस्ट कर सकते हैं कि मुझे कार्य जोड़ने के लिए कहां जाना है। जब सभी निर्माण करेंगे? –

+0

@AttilaHajdrik शायद आपके डेटाफ़्लो सेटअप कोड के अंत में, आपके 'लिंकटो' के पास। – svick

22

यहां मुद्दा यह है कि आप PropagateCompletion property हर बार जब आप LinkTo method फोन अपने परिवर्तन ब्लॉकों में ब्लॉक और प्रतीक्षा समय में अलग से जोड़ने के लिए सेट कर रहे हैं है।

IDataflowBlock interface (जोर मेरा) पर Complete method के लिए दस्तावेज़ से

: IDataflowBlock को

सिग्नल कि यह स्वीकार नहीं करना चाहिए और न ही किसी भी अधिक संदेशों उत्पादन और न ही किसी भी अधिक के लिए स्थगित कर गए संदेशों खपत करते हैं।

क्योंकि आप TransformBlock<TInput, TOutput> उदाहरणों में से प्रत्येक में अपने प्रतीक्षा समय बाहर लड़खड़ाते हैं, transformBlock2 (20 एमएस के लिए इंतज़ार कर) transformBlock1 से पहले समाप्त हो गया है (50 एमएस के लिए इंतज़ार कर)। transformBlock2 पहले पूर्ण करता है, और उसके बाद संकेत को processorBlock पर भेजता है जो तब कहता है "मैं कुछ और स्वीकार नहीं कर रहा हूं" (और transformBlock1 ने अभी तक अपने सभी संदेश नहीं बनाए हैं)।

ध्यान दें कि transformBlock1transformBlock1 से पहले बिल्कुल गारंटीकृत नहीं है; यह संभव है कि थ्रेड पूल (मान लें कि आप डिफ़ॉल्ट शेड्यूलर का उपयोग कर रहे हैं) कार्यों को एक अलग क्रम में संसाधित करेंगे (लेकिन संभावना से अधिक नहीं होगा, क्योंकि 20 एमएस आइटम किए जाने के बाद यह कतारों से काम चुरा लेगा)।

आपका पाइपलाइन इस तरह दिखता है:

  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      \   /
      processorBlock 

आदेश में इस से बचने के लिए, यदि आप चाहते हैं एक पाइप लाइन है कि इस तरह दिखता है के लिए:

  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      |    | 
processorBlock1 processorBlock2 

कौन सा सिर्फ दो अलग-अलग बनाकर किया जाता है ActionBlock<TInput> उदाहरण, जैसे:

// The action, can be a method, makes it easier to share. 
Action<string> a = i => Console.WriteLine(i); 

// Create the processor blocks. 
processorBlock1 = new ActionBlock<string>(a); 
processorBlock2 = new ActionBlock<string>(a); 


// Linking 
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 

आपको फिर दोनों पर प्रतीक्षा करने की आवश्यकता है प्रोसेसर के बजाय ब्लॉक सिर्फ एक:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait(); 

एक बहुत महत्वपूर्ण नोट यहाँ; ActionBlock<TInput> बनाते समय, डिफ़ॉल्ट पर MaxDegreeOfParallelism propertyExecutionDataflowBlockOptions उदाहरण पर सेट किया गया है।

इसका मतलब है कि Action<T> delegate पर कॉल जो आप ActionBlock<TInput> पर पास करते हैं, थ्रेड-सुरक्षित हैं, केवल एक ही समय में निष्पादित होगा।

क्योंकि आप अब एक ही Action<T> प्रतिनिधि की ओर इशारा करते दोActionBlock<TInput> मामले हैं तो आप की गारंटी नहीं है धागे की सुरक्षा।

यदि आपकी विधि थ्रेड-सुरक्षित है, तो आपको कुछ भी करने की ज़रूरत नहीं है (जो आपको MaxDegreeOfParallelism प्रॉपर्टी को DataflowBlockOptions.Unbounded पर सेट करने की अनुमति देगा, क्योंकि ब्लॉक करने का कोई कारण नहीं है)।

यदि यह थ्रेड-सुरक्षित नहीं है, और आपको इसकी गारंटी देने की आवश्यकता है, तो आपको lock statement जैसे पारंपरिक सिंक्रनाइज़ेशन प्राइमेटिव्स का सहारा लेना होगा।

इस मामले में, आप इसे इतना पसंद करते हैं चाहते हैं (हालांकि यह स्पष्ट रूप से की जरूरत नहीं है, के रूप में Console class पर WriteLine method धागा सुरक्षित है):

// The lock. 
var l = new object(); 

// The action, can be a method, makes it easier to share. 
Action<string> a = i => { 
    // Ensure one call at a time. 
    lock (l) Console.WriteLine(i); 
}; 

// And so on... 
+0

लंबे उत्तर के लिए धन्यवाद, लेकिन मैंने svick का जवाब चुना क्योंकि यह सीधे टीपीएल डेटाफ्लो पर लागू होता है और यह एक संक्षिप्त और सरल समाधान प्रदान करता है । –

+2

यदि आप दोनों क्रिया ब्लॉक के लिए समान ['ExclusiveScheduler'] (http://msdn.microsoft.com/en-us/library/system.threading.tasks.concurrentexclusiveschedulerpair.exclusivescheduler) का उपयोग करते हैं तो आप आसानी से लॉकिंग से बच सकते हैं। – svick

7

एक के svick के अलावा उत्तर: प्रोपेगेट कॉम्प्लिशन विकल्प के साथ प्राप्त व्यवहार के अनुरूप होने के लिए, आपको पिछले ब्लॉक के मामले में अपवादों को आगे बढ़ाने की भी आवश्यकता है। निम्नलिखित की तरह एक विस्तार विधि का ख्याल रखता है साथ ही:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) { 
    if (target == null) return; 
    if (sources.Length == 0) { target.Complete(); return; } 
    Task.Factory.ContinueWhenAll(
     sources.Select(b => b.Completion).ToArray(), 
     tasks => { 
      var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList(); 
      if (exceptions.Count != 0) { 
       target.Fault(new AggregateException(exceptions)); 
      } else { 
       target.Complete(); 
      } 
     } 
    ); 
} 
0

अन्य उत्तर क्यों PropagateCompletion = सच गड़बड़ चीजों को एक ब्लॉक दो से अधिक स्रोतों की जब के बारे में काफी स्पष्ट हैं।

समस्या का एक आसान समाधान प्रदान करने के लिए, आप एक ओपन सोर्स लाइब्रेरी DataflowEx देख सकते हैं जो इस तरह की समस्या को स्मार्ट पूर्णता नियमों के साथ हल करता है। (यह TPL Dataflow आंतरिक रूप से जोड़ने का उपयोग करता है, लेकिन समर्थन करता है जटिल पूरा होने प्रचार। कार्यान्वयन WhenAll के समान लग रहा है, लेकिन यह भी गतिशील लिंक जोड़ने संभालती है। कृपया Dataflow.RegisterDependency() और TaskEx.AwaitableWhenAll() impl विस्तार के लिए जाँच करें।)

मैं थोड़ा सब कुछ काम करने के लिए अपने कोड बदल डेटाफ्लोएक्स का उपयोग करके:

public CompletionDemo1() 
{ 
    broadCaster = new BroadcastBlock<int>(
     i => 
      { 
       return i; 
      }).ToDataflow(); 

    transformBlock1 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

    transformBlock2 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock2.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

    processor = new ActionBlock<string>(
     i => 
      { 
       Console.WriteLine(i); 
      }).ToDataflow(); 

    /** rather than TPL linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    **/ 

    //Use DataflowEx linking 
    var transform1 = transformBlock1.ToDataflow(); 
    var transform2 = transformBlock2.ToDataflow(); 

    broadCaster.LinkTo(transform1); 
    broadCaster.LinkTo(transform2); 
    transform1.LinkTo(processor); 
    transform2.LinkTo(processor); 
} 

पूर्ण कोड here है।

अस्वीकरण: मैं डेटाफ्लोएक्स का लेखक हूं, जो एमआईटी लाइसेंस के तहत प्रकाशित है।

+0

यदि आप ग्रिडसम के लिए काम करते हैं तो क्या आप कृपया खुलासा कर सकते हैं? मेरे प्रश्न ने स्पष्ट रूप से उल्लेख किया है कि मुझे टीपीएल डेटाफ्लो के उत्तर की आवश्यकता है, मैं इस समस्या के लिए किसी तृतीय पक्ष समाधान का उपयोग नहीं करना चाहता था। धन्यवाद। –

+1

हां, मैं ग्रिडसम के लिए काम करता हूं। लेकिन पुस्तकालय पूरी तरह से मुक्त और खुला स्रोत है इसलिए मैंने सोचा कि यह आपकी मदद कर सकता है। कोई वाणिज्यिक सोच बिल्कुल नहीं। यदि आपको टीपीएल डेटाफ्लो की आंतरिक तंत्र के बारे में आवश्यकता है तो कृपया मेरे उत्तर को अनदेखा करें। लेकिन अगर किसी को * समाधान * की आवश्यकता है तो उत्तर का मूल्य है। धन्यवाद :) – Dodd

+0

थोड़ा और विस्तार के साथ उत्तर अद्यतन किया गया। अस्वीकरण भी जोड़ा गया। – Dodd

संबंधित मुद्दे