संपादित करें: बाहर निकलता है मैं बहुत गलत था। TransformBlock
उसी क्रम में लौटने वाले आइटम लौटते हैं, भले ही यह समांतरता के लिए कॉन्फ़िगर किया गया हो। इसके कारण, मेरे मूल उत्तर में कोड पूरी तरह से बेकार है और इसके बजाय TransformBlock
का उपयोग किया जा सकता है।
मूल जवाब:
जहाँ तक मुझे पता के रूप में नेट में केवल एक ही समानांतरवाद निर्माण का समर्थन करता है क्रम में संसाधित आइटम लौटने वे में आया: AsOrdered()
साथ PLINQ। लेकिन मुझे ऐसा लगता है कि PLINQ ठीक से फिट नहीं है जो आप चाहते हैं।
दूसरी तरफ, टीपीएल डेटाफ्लो, ठीक है, मुझे लगता है, लेकिन इसमें एक ऐसा ब्लॉक नहीं है जो समांतरता का समर्थन करेगा और एक ही समय में वस्तुओं को वापस लौटाएगा (TransformBlock
उन दोनों का समर्थन करता है, लेकिन नहीं उसी समय)। सौभाग्य से, डेटाफ्लो ब्लॉक को कंपोज़ेबिलिटी के साथ दिमाग में डिजाइन किया गया था, इसलिए हम अपना खुद का ब्लॉक बना सकते हैं जो ऐसा करता है।
लेकिन सबसे पहले, हमें यह समझना होगा कि परिणाम कैसे क्रमबद्ध करें। एक समवर्ती शब्दकोश का उपयोग करना, जैसा कि आपने सुझाव दिया है, कुछ सिंक्रनाइज़ेशन तंत्र के साथ, निश्चित रूप से काम करेगा। लेकिन मुझे लगता है कि एक आसान समाधान है: Task
एस की कतार का उपयोग करें। आउटपुट कार्य में, आप Task
को अस्वीकार करते हैं, इसे पूरा करने के लिए प्रतीक्षा करें (असीमित रूप से) और जब ऐसा होता है, तो आप इसका परिणाम भेजते हैं।कतार खाली होने पर हमें अभी भी कुछ सिंक्रनाइज़ेशन की आवश्यकता है, लेकिन अगर हम चाकू का उपयोग करने के लिए कौन सी कतार चुनते हैं तो हम इसे मुफ्त में प्राप्त कर सकते हैं।
तो, सामान्य विचार इस तरह है: हम जो कुछ लिख रहे हैं वह कुछ इनपुट और कुछ आउटपुट के साथ IPropagatorBlock
होगा। कस्टम IPropagatorBlock
बनाने का सबसे आसान तरीका एक ब्लॉक बनाना है जो इनपुट को संसाधित करता है, एक अन्य ब्लॉक जो परिणाम उत्पन्न करता है और उन्हें DataflowBlock.Encapsulate()
का उपयोग करके एक के रूप में व्यवहार करता है।
इनपुट ब्लॉक को आने वाले आइटम को सही क्रम में संसाधित करना होगा, इसलिए वहां कोई समांतरता नहीं है। यह एक नया Task
(वास्तव में, TaskCompletionSource
) बनाएगा, ताकि हम बाद में Task
का परिणाम सेट कर सकें), इसे कतार में जोड़ें और फिर सही Task
के परिणाम को सेट करने के लिए किसी भी तरीके से प्रसंस्करण के लिए आइटम भेजें । चूंकि हमें इस ब्लॉक को किसी भी चीज़ से जोड़ने की आवश्यकता नहीं है, इसलिए हम ActionBlock
का उपयोग कर सकते हैं।
आउटपुट ब्लॉक को कतार से Task
एस लेना होगा, अतुल्यकालिक रूप से उनके लिए प्रतीक्षा करें, और फिर उन्हें साथ भेजें। लेकिन चूंकि सभी ब्लॉकों में उनमें एक कतार लगाई गई है, और प्रतिनिधियों को ले जाने वाले ब्लॉक में अंतर्निहित प्रतीक्षा की गई है, यह बहुत आसान होगा: new TransformBlock<Task<TOutput>, TOutput>(t => t)
। यह ब्लॉक कतार के रूप में और आउटपुट ब्लॉक दोनों के रूप में काम करेगा। इस वजह से, हमें किसी भी सिंक्रनाइज़ेशन से निपटने की ज़रूरत नहीं है।
पहेली का अंतिम भाग वास्तव में समानांतर में वस्तुओं को संसाधित कर रहा है। इसके लिए, हम MaxDegreeOfParallelism
सेट के साथ इस समय ActionBlock
का उपयोग कर सकते हैं। यह इनपुट ले जाएगा, इसे संसाधित करेगा, और कतार में सही Task
का परिणाम सेट करेगा।
एकत्र किया गया, यह ऐसा दिखाई दे सकता: इतना बात करने के बाद
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
, कि कोड की काफी एक छोटी राशि है, मुझे लगता है।
ऐसा लगता है कि आप प्रदर्शन के बारे में बहुत परवाह करते हैं, इसलिए आपको इस कोड को ठीक करने की आवश्यकता हो सकती है। उदाहरण के लिए, ओवरस्क्रिप्शन से बचने के लिए ब्लॉक जैसे Environment.ProcessorCount
पर MaxDegreeOfParallelism
सेट करने का अर्थ हो सकता है। साथ ही, यदि विलंबता आपके लिए थ्रूपुट से अधिक महत्वपूर्ण है, तो उसी ब्लॉक के MaxMessagesPerTask
को 1 (या एक और छोटी संख्या) पर सेट करना समझ सकता है ताकि जब किसी आइटम की प्रसंस्करण समाप्त हो जाए, तो इसे तुरंत आउटपुट पर भेज दिया जाता है।
इसके अलावा, यदि आप आने वाली वस्तुओं को थ्रॉटल करना चाहते हैं, तो आप enqueuer
के BoundedCapacity
सेट कर सकते हैं।
वाह उपहारों का एक गुच्छा जो मैं सबसे पहले पचाना और कोशिश करना चाहता हूं। उन लोगों के लिए बहुत बहुत धन्यवाद, यह कम से कम एक अपवित्र योग्य है ;-) मुझे उन विचारों के साथ खेलने दो और मैं वापस आ गया। क्विकिंग कार्य बहुत समझ में आता है और मुझे आश्चर्य है कि मुझे इससे पहले क्यों नहीं मिला। –
ठीक है, मैं आपके पोस्ट के माध्यम से कुछ समय बिताता हूं और टीपीएल डेटाफ्लो पर पढ़ता हूं, यहां आपके प्रस्तावित समाधान को पूरी तरह से समझने के लिए कुछ प्रश्न हैं: (1) आप एक कस्टम आईप्रॉपैगेटरब्लॉक और आईडीटाफ्लोब्लॉक.इनकैपलेट() को ट्रांसफॉर्मब्लॉक क्यों पहले से मौजूद हैं? (2) मैं यह देखने में असफल रहा कि आप वास्तव में ब्लॉक को जोड़ने की योजना कैसे बनाते हैं। आप TransformBlocks के पहले ActionBlocks के पहले बात करते हैं। जो मैंने पढ़ा है, क्या एक्शनब्लॉक पूरे आर्किटेक्चर का "अंत बिंदु" नहीं होगा? –
1. यह दूसरे पैराग्राफ में समझाया गया है: 'ट्रांसफॉर्मब्लॉक' समानांतर में वस्तुओं को संसाधित करने में सक्षम नहीं है और उन्हें एक ही समय में वापस कर सकता है। यह या तो उनमें से एक कर सकता है, लेकिन दोनों नहीं। – svick