2013-02-11 18 views
9

में समानांतर कार्यों से कैसे उत्पन्न करें मैं समानांतर कार्य/प्रतीक्षा के साथ .NET iterator का उपयोग करना चाहता हूं? कुछ ऐसा:.NET 4.5

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source) 
{ 
    Parallel.ForEach(
     source, 
     s=> 
     { 
      // Ordering is NOT important 
      // items can be yielded as soon as they are done     
      yield return ExecuteOrDownloadSomething(s); 
     } 
} 

दुर्भाग्य से .NET इसे मूल रूप से संभाल नहीं सकता है। @svick द्वारा अब तक का सबसे अच्छा जवाब - AsParallel() का उपयोग करें।

बोनस: कोई भी सरल async/प्रतीक्षा कोड जो एकाधिक प्रकाशकों और एक ग्राहक को लागू करता है? ग्राहक उपज करेगा, और पब संसाधित करेंगे। (केवल मूल पुस्तकालय)

उत्तर

11

यह PLINQ के लिए एक नौकरी की तरह लगता है:

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s)); 

इस धागे की एक सीमित संख्या का उपयोग कर, के रूप में पूरा होते ही यह प्रत्येक परिणाम लौटने समानांतर में प्रतिनिधि निष्पादित करेंगे।

ExecuteOrDownloadSomething() विधि आईओ बाध्य किया जाता है तो (उदाहरण के लिए वास्तव में कुछ डाउनलोड करता है) और आप async का उपयोग कर धागे बर्बाद करने के लिए है, तो नहीं करना चाहता - await मतलब हो सकता है, लेकिन यह और अधिक जटिल हो जाएगा।

यदि आप async का पूरी तरह से लाभ लेना चाहते हैं, तो आपको IEnumerable वापस नहीं करना चाहिए, क्योंकि यह सिंक्रोनस है (यानी यदि कोई आइटम उपलब्ध नहीं है तो यह ब्लॉक करता है)। क्या आप की जरूरत अतुल्यकालिक संग्रह के कुछ प्रकार है, और आप का उपयोग कर सकते ISourceBlock (विशेष रूप से, TransformBlock) TPL Dataflow से उस के लिए: स्रोत है

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source) 
{ 
    var block = new TransformBlock<TSrc, TDest>(
     async s => await ExecuteOrDownloadSomethingAsync(s), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    foreach (var item in source) 
     block.Post(item); 

    block.Complete(); 

    return block; 
} 

हैं "धीमी" (यानी आप से परिणाम प्रसंस्करण शुरू करना चाहते हैं Foo()source फिर से पूरा होने से पहले), आप foreach और Complete() को अलग Task पर कॉल करना चाहते हैं। source को ISourceBlock<TSrc> में भी बेहतर समाधान करना होगा।

+0

द्वारा अधिग्रहण किया गया है, लेकिन क्या आप इस उदाहरण का उदाहरण दे सकते हैं कि इसे एसिंक/प्रतीक्षा के साथ कैसे हल किया जा सकता है? धन्यवाद! – Yurik

+0

@ युरिक क्या आप समझा सकते हैं कि आप ऐसा क्यों चाहते हैं? – svick

+0

अधिकतर क्योंकि मुझे लगता है कि यह मुझे एक समस्या के लिए नए प्रतीक्षा वाक्यविन्यास को समझने में मदद करेगा जो "async 101" नहीं है, बल्कि वास्तविक दुनिया परिदृश्य है। – Yurik

0

एमएस रोबोटिक्स टीम द्वारा बनाई गई एसिंक्रोनस लाइब्रेरी में, उनके पास समवर्ती प्राथमिकताएं थीं जो एसिंक्रोनस कोड उत्पन्न करने के लिए एक इटरेटर का उपयोग करने की अनुमति देती थीं।

लाइब्रेरी (सीसीआर) मुफ़्त है (यह मुफ्त में उपयोग नहीं किया गया था)। एक अच्छा परिचयात्मक लेख यहां पाया जा सकता: Concurrent affairs

शायद आप नेट काम पुस्तकालय के साथ-साथ इस पुस्तकालय का उपयोग कर सकते हैं, या यह आपको प्रेरित करेंगे 'अपनी खुद की रोल' के लिए

+0

आप समझा सकते हैं कि कैसे वास्तव में आप सीसीआर यहाँ का प्रयोग करेंगे? – svick

+0

मैंने जो लेख उद्धृत किया है, उससे मैं इसे बेहतर समझा सकता हूं।यदि आप इसे देखते हैं और आकृति की जांच करते हैं: 'चित्रा 6 सीरियलएसिंक डीमो' में इसका एक कोड उदाहरण है जो ओपी पूछता है: एसिंक ऑपरेशन का उपयोग करने के लिए एक .Net iterator का उपयोग कर। मैं मानता हूं कि मुझे लगता है कि यह इटेटरेटर सिंटैक्स, हालांकि इसके समय के लिए चालाक है, अब ज्यादातर एसिंक/प्रतीक्षा वाक्यविन्यास – Toad

1

तो ऐसा लगता है कि आप वास्तव में करना चाहते हैं, जब वे पूरा करते हैं तो कार्यों के अनुक्रम को क्रमबद्ध करना है। तो यहाँ हम प्रत्येक इनपुट कार्य के लिए एक TaskCompletionSource बनाते हैं, तो प्रत्येक कार्य के माध्यम से जाने के लिए और एक निरंतरता है जो एक BlockingCollection से अगले पूरा होने के स्रोत पकड़ लेता है और यह परिणाम है सेट सेट

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks) 
{ 
    var input = tasks.ToList(); 

    var output = input.Select(task => new TaskCompletionSource<T>()); 
    var collection = new BlockingCollection<TaskCompletionSource<T>>(); 
    foreach (var tcs in output) 
     collection.Add(tcs); 

    foreach (var task in input) 
    { 
     task.ContinueWith(t => 
     { 
      var tcs = collection.Take(); 
      switch (task.Status) 
      { 
       case TaskStatus.Canceled: 
        tcs.TrySetCanceled(); 
        break; 
       case TaskStatus.Faulted: 
        tcs.TrySetException(task.Exception.InnerExceptions); 
        break; 
       case TaskStatus.RanToCompletion: 
        tcs.TrySetResult(task.Result); 
        break; 
      } 
     } 
     , CancellationToken.None 
     , TaskContinuationOptions.ExecuteSynchronously 
     , TaskScheduler.Default); 
    } 

    return output.Select(tcs => tcs.Task); 
} 

: यह बहुत जटिल नहीं है। पहला कार्य पूरा होने वाले पहले टीसीएस को पूरा करता है, दूसरा कार्य पूरा हो जाता है जो दूसरी टीसीएस लौटाता है, और इसी तरह।

अब आप अपने कोड काफी सरल हो जाता है:

var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item)) 
    .Order(); 
foreach(var task in tasks) 
{ 
    var result = task.Result;//or you could `await` each result 
    //.... 
} 
+0

धन्यवाद, लेकिन मुझे एक विधि से उपज के रूप में प्रसंस्कृत वस्तुओं की एक धारा प्राप्त करने के लिए क्या चाहिए। आपने जो पेशकश की है वह मूल रूप से समांतर का एक पुनर्लेख है। फोरेच()। – Yurik

+0

@Yurik यदि आपको सभी आइटमों को पूरा करने की प्रतीक्षा करने की आवश्यकता नहीं है, तो आप 'कब सभी'/'WaitAll' को हटा सकते हैं, लेकिन इसके अलावा मैं यह देखने में असफल रहा कि 'चयन' आपको जो चाहिए वह नहीं करता और खुद की। आपके पास आइटम का अनुक्रम है, और आप इसे कार्यों के अनुक्रम में बदलना चाहते हैं, प्रत्येक आइटम के लिए एक। 'चुनें (आइटम => LongRunningOperation (आइटम))' आपकी आवश्यकताओं को पूरा नहीं करता है क्योंकि यह कार्य का अनुक्रम देता है? – Servy

+0

उस स्थिति में वस्तुओं का क्रम मूल के समान होगा, जो अक्षम हो सकता है। मुझे वस्तुओं की उपज के आदेश से कोई फर्क नहीं पड़ता। – Yurik

संबंधित मुद्दे