10

मैं विभिन्न कार्यों पर एक सुंदर ठेठ निर्माता/उपभोक्ता मॉडल चलाता हूं।क्या यह टीपीएल डेटाफ्लो के लिए एक नौकरी है?

कार्य 1: बाइटरी फ़ाइलों से बाइट [] के बैचों को पढ़ता है और बाइट एरे के प्रत्येक संग्रह के लिए एक नया कार्य बंद कर देता है। (ऑपरेशन स्मृति प्रबंधन उद्देश्यों के लिए बैच किया गया है)।

कार्य 2-एन: वे कार्यकर्ता कार्य हैं और प्रत्येक बाइट एरे के पारित संग्रह (कार्य 1 से) पर चलते हैं और बाइट एरे को डी-सीरियलाइज करते हैं, उन्हें कुछ मानदंडों के अनुसार टाइप करते हैं, और फिर परिणामस्वरूप ऑब्जेक्ट्स का संग्रह संग्रहीत करते हैं (प्रत्येक बाइट सरणी इस तरह की वस्तु में deserializes) एक समवर्ती शब्दकोश में।

कार्य (एन + 1) मैंने एक समवर्ती शब्दकोश चुना क्योंकि इस कार्य का काम उन संग्रहों को मर्ज करना है जो समवर्ती शब्दकोश में संग्रहीत हैं, उसी क्रम में वे कार्य 1 से कैसे उत्पन्न हुए हैं। मैं प्राप्त करता हूं कि एक संग्रह आईडी पास करके (यह टास्क 1 के भीतर प्रत्येक नए संग्रह के लिए int int और incremented है) टास्क 1 से इस कार्य तक सभी तरह से नीचे। यह कार्य मूल रूप से जांचता है कि अगला अपेक्षित संग्रह आईडी पहले से ही समवर्ती शब्दकोश में संग्रहीत है और यदि हां, इसे बाहर ले जाता है, तो इसे अंतिम पंक्ति में जोड़ता है और समवर्ती शब्दकोश में अगले संग्रह के लिए चेक करता है।

अब, मैंने जो पढ़ा है और मैंने जो वीडियो देखा है, उससे मुझे लगता है कि टीपीएल डेटाफ्लो ऐसे निर्माता/उपभोक्ता मॉडल के लिए एक आदर्श उम्मीदवार हो सकता है। मैं बस एक डिज़ाइन तैयार करने में सक्षम नहीं हूं और इस प्रकार शुरू हो गया क्योंकि मैंने कभी भी टीपीएल डेटाफ्लो के साथ काम नहीं किया है। थ्रूपुट और विलंबता के मामले में यह पुस्तकालय भी कार्य तक है? मैं वर्तमान में 2.5 मिलियन बाइट सरणी संसाधित करता हूं और इस प्रकार परिणामी संग्रह में प्रति सेकंड वस्तुओं को संसाधित करता हूं। क्या टीपीएल डेटाफ्लो को सरल बनाने में मदद मिल सकती है? मैं विशेष रूप से निम्नलिखित प्रश्न के उत्तर में रूचि रखता हूं: क्या टीपीएल डेटाफ्लो कार्यकर्ता कार्यों को खत्म करते समय कार्य 1 से संग्रह बैचों के आदेश को सुरक्षित रख सकता है और कार्यकर्ता कार्यों ने अपना काम पूरा करने के बाद उन्हें फिर से विलय कर सकता है? क्या यह चीजों को अनुकूलित करता है? पूरी संरचना का प्रोफाइल करने के बाद मुझे लगता है कि कताई और बहुत सारे समवर्ती संग्रह शामिल होने के कारण काफी समय बर्बाद हो गया है।

कोई विचार, विचार?

उत्तर

12

संपादित करें: बाहर निकलता है मैं बहुत गलत था। TransformBlock उसी क्रम में लौटने वाले आइटम लौटते हैं, भले ही यह समांतरता के लिए कॉन्फ़िगर किया गया हो। इसके कारण, मेरे मूल उत्तर में कोड पूरी तरह से बेकार है और इसके बजाय TransformBlock का उपयोग किया जा सकता है।


मूल जवाब:

जहाँ तक मुझे पता के रूप में नेट में केवल एक ही समानांतरवाद निर्माण का समर्थन करता है क्रम में संसाधित आइटम लौटने वे में आया: AsOrdered() साथ PLINQ। लेकिन मुझे ऐसा लगता है कि PLINQ ठीक से फिट नहीं है जो आप चाहते हैं।

दूसरी तरफ, टीपीएल डेटाफ्लो, ठीक है, मुझे लगता है, लेकिन इसमें एक ऐसा ब्लॉक नहीं है जो समांतरता का समर्थन करेगा और एक ही समय में वस्तुओं को वापस लौटाएगा (TransformBlock उन दोनों का समर्थन करता है, लेकिन नहीं उसी समय)। सौभाग्य से, डेटाफ्लो ब्लॉक को कंपोज़ेबिलिटी के साथ दिमाग में डिजाइन किया गया था, इसलिए हम अपना खुद का ब्लॉक बना सकते हैं जो ऐसा करता है।

लेकिन सबसे पहले, हमें यह समझना होगा कि परिणाम कैसे क्रमबद्ध करें। एक समवर्ती शब्दकोश का उपयोग करना, जैसा कि आपने सुझाव दिया है, कुछ सिंक्रनाइज़ेशन तंत्र के साथ, निश्चित रूप से काम करेगा। लेकिन मुझे लगता है कि एक आसान समाधान है: Task एस की कतार का उपयोग करें। आउटपुट कार्य में, आप Task को अस्वीकार करते हैं, इसे पूरा करने के लिए प्रतीक्षा करें (असीमित रूप से) और जब ऐसा होता है, तो आप इसका परिणाम भेजते हैं।कतार खाली होने पर हमें अभी भी कुछ सिंक्रनाइज़ेशन की आवश्यकता है, लेकिन अगर हम चाकू का उपयोग करने के लिए कौन सी कतार चुनते हैं तो हम इसे मुफ्त में प्राप्त कर सकते हैं।

तो, सामान्य विचार इस तरह है: हम जो कुछ लिख रहे हैं वह कुछ इनपुट और कुछ आउटपुट के साथ IPropagatorBlock होगा। कस्टम IPropagatorBlock बनाने का सबसे आसान तरीका एक ब्लॉक बनाना है जो इनपुट को संसाधित करता है, एक अन्य ब्लॉक जो परिणाम उत्पन्न करता है और उन्हें DataflowBlock.Encapsulate() का उपयोग करके एक के रूप में व्यवहार करता है।

इनपुट ब्लॉक को आने वाले आइटम को सही क्रम में संसाधित करना होगा, इसलिए वहां कोई समांतरता नहीं है। यह एक नया Task (वास्तव में, TaskCompletionSource) बनाएगा, ताकि हम बाद में Task का परिणाम सेट कर सकें), इसे कतार में जोड़ें और फिर सही Task के परिणाम को सेट करने के लिए किसी भी तरीके से प्रसंस्करण के लिए आइटम भेजें । चूंकि हमें इस ब्लॉक को किसी भी चीज़ से जोड़ने की आवश्यकता नहीं है, इसलिए हम ActionBlock का उपयोग कर सकते हैं।

आउटपुट ब्लॉक को कतार से Task एस लेना होगा, अतुल्यकालिक रूप से उनके लिए प्रतीक्षा करें, और फिर उन्हें साथ भेजें। लेकिन चूंकि सभी ब्लॉकों में उनमें एक कतार लगाई गई है, और प्रतिनिधियों को ले जाने वाले ब्लॉक में अंतर्निहित प्रतीक्षा की गई है, यह बहुत आसान होगा: new TransformBlock<Task<TOutput>, TOutput>(t => t)। यह ब्लॉक कतार के रूप में और आउटपुट ब्लॉक दोनों के रूप में काम करेगा। इस वजह से, हमें किसी भी सिंक्रनाइज़ेशन से निपटने की ज़रूरत नहीं है।

पहेली का अंतिम भाग वास्तव में समानांतर में वस्तुओं को संसाधित कर रहा है। इसके लिए, हम MaxDegreeOfParallelism सेट के साथ इस समय ActionBlock का उपयोग कर सकते हैं। यह इनपुट ले जाएगा, इसे संसाधित करेगा, और कतार में सही Task का परिणाम सेट करेगा।

एकत्र किया गया, यह ऐसा दिखाई दे सकता: इतना बात करने के बाद

public static IPropagatorBlock<TInput, TOutput> 
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform) 
{ 
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t); 

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
     tuple => tuple.Item2(transform(tuple.Item1)), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    var enqueuer = new ActionBlock<TInput>(
     async item => 
     { 
      var tcs = new TaskCompletionSource<TOutput>(); 
      await processor.SendAsync(
       new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult)); 
      await queue.SendAsync(tcs.Task); 
     }); 

    enqueuer.Completion.ContinueWith(
     _ => 
     { 
      queue.Complete(); 
      processor.Complete(); 
     }); 

    return DataflowBlock.Encapsulate(enqueuer, queue); 
} 

, कि कोड की काफी एक छोटी राशि है, मुझे लगता है।

ऐसा लगता है कि आप प्रदर्शन के बारे में बहुत परवाह करते हैं, इसलिए आपको इस कोड को ठीक करने की आवश्यकता हो सकती है। उदाहरण के लिए, ओवरस्क्रिप्शन से बचने के लिए ब्लॉक जैसे Environment.ProcessorCount पर MaxDegreeOfParallelism सेट करने का अर्थ हो सकता है। साथ ही, यदि विलंबता आपके लिए थ्रूपुट से अधिक महत्वपूर्ण है, तो उसी ब्लॉक के MaxMessagesPerTask को 1 (या एक और छोटी संख्या) पर सेट करना समझ सकता है ताकि जब किसी आइटम की प्रसंस्करण समाप्त हो जाए, तो इसे तुरंत आउटपुट पर भेज दिया जाता है।

इसके अलावा, यदि आप आने वाली वस्तुओं को थ्रॉटल करना चाहते हैं, तो आप enqueuer के BoundedCapacity सेट कर सकते हैं।

+0

वाह उपहारों का एक गुच्छा जो मैं सबसे पहले पचाना और कोशिश करना चाहता हूं। उन लोगों के लिए बहुत बहुत धन्यवाद, यह कम से कम एक अपवित्र योग्य है ;-) मुझे उन विचारों के साथ खेलने दो और मैं वापस आ गया। क्विकिंग कार्य बहुत समझ में आता है और मुझे आश्चर्य है कि मुझे इससे पहले क्यों नहीं मिला। –

+0

ठीक है, मैं आपके पोस्ट के माध्यम से कुछ समय बिताता हूं और टीपीएल डेटाफ्लो पर पढ़ता हूं, यहां आपके प्रस्तावित समाधान को पूरी तरह से समझने के लिए कुछ प्रश्न हैं: (1) आप एक कस्टम आईप्रॉपैगेटरब्लॉक और आईडीटाफ्लोब्लॉक.इनकैपलेट() को ट्रांसफॉर्मब्लॉक क्यों पहले से मौजूद हैं? (2) मैं यह देखने में असफल रहा कि आप वास्तव में ब्लॉक को जोड़ने की योजना कैसे बनाते हैं। आप TransformBlocks के पहले ActionBlocks के पहले बात करते हैं। जो मैंने पढ़ा है, क्या एक्शनब्लॉक पूरे आर्किटेक्चर का "अंत बिंदु" नहीं होगा? –

+1

1. यह दूसरे पैराग्राफ में समझाया गया है: 'ट्रांसफॉर्मब्लॉक' समानांतर में वस्तुओं को संसाधित करने में सक्षम नहीं है और उन्हें एक ही समय में वापस कर सकता है। यह या तो उनमें से एक कर सकता है, लेकिन दोनों नहीं। – svick

संबंधित मुद्दे