2017-11-21 80 views
5

में नहीं चलता है, मैं वहां PLYNQ विशेषज्ञों से किसी भी मदद की सराहना करता हूं! मुझे उत्तर की समीक्षा करने में समय लगेगा, मेरे पास गणित पर एक और स्थापित प्रोफ़ाइल है। एसई।संभावित कारण क्यों ParallelQuery.Aggregate समानांतर

मेरे पास ParallelQuery<List<string>> प्रकार का एक ऑब्जेक्ट है, जिसमें 44 सूचियां हैं जिन्हें मैं समानांतर में एक साथ प्रक्रिया करना चाहता हूं (एक समय में पांच कहें)। मेरे प्रक्रिया की तरह

private ProcessResult Process(List<string> input) 

प्रसंस्करण एक हस्ताक्षर एक परिणाम है, जो बूलियन मूल्यों की एक जोड़ी है, जैसा कि नीचे वापस आ जाएगी है।

private struct ProcessResult 
    { 
     public ProcessResult(bool initialised, bool successful) 
     { 
      ProcessInitialised = initialised; 
      ProcessSuccessful = successful; 
     } 

     public bool ProcessInitialised { get; } 
     public bool ProcessSuccessful { get; } 
    } 

समस्या।IEnumerable<List<string>> processMe को देखते हुए, मेरी PLYNQ क्वेरी इस विधि को लागू करने का प्रयास करती है: https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx। यह रूप में

processMe.AsParallel() 
     .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult> 
      (
       new ConcurrentStack<ProcessResult>, //aggregator seed 
       (agg, input) => 
       {       //updating the aggregate result 
        var res = Process(input); 
        agg.Push(res); 
        return agg; 
       }, 
       agg => 
       {       //obtain the result from the aggregator agg 
        ProcessResult res; // (in this case just the most recent result**) 
        agg.TryPop(out res); 
        return res; 
       } 
      ); 

लिखा है दुर्भाग्य से यह समानांतर में नहीं चलता है, केवल क्रमिक रूप से। (** ध्यान दें कि इस कार्यान्वयन नहीं है "भावना", मैं सिर्फ parallelisation अब के लिए काम करने के लिए प्राप्त करने की कोशिश कर रहा हूँ।)


मैं एक अलग कार्यान्वयन, जो समानांतर में रन किया की कोशिश की , लेकिन कोई एकत्रीकरण नहीं था। मैं परिभाषित एक एकत्रीकरण विधि (जो अनिवार्य रूप से एक बूलियन और ProcessResult के दोनों भागों पर है, अर्थात कुल ([A1, A2], [B1, B2]) ≡ [A1 & & बी 1, ए 2 & & बी 2])।

private static ProcessResult AggregateProcessResults 
     (ProcessResult aggregate, ProcessResult latest) 
    { 
     bool ini = false, suc = false; 
     if (aggregate.ProcessInitialised && latest.ProcessInitialised) 
      ini = true; 
     if (aggregate.ProcessSuccessful && latest.ProcessSuccessful) 
      suc = true; 


     return new ProcessResult(ini, suc); 
    } 

और PLYNQ क्वेरी https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true), 
    (res, input) => Process(input), 
    (agg, latest) => AggregateProcessResults(agg, latest), 
    agg   => agg 
इस्तेमाल किया

समस्या यहीं पर AggregateProcessResults कोड मारा कभी नहीं था किसी कारण-मुझे नहीं पता है, जहां परिणामों जा रहे थे हूँ के लिए, ...

पढ़ने के लिए धन्यवाद, किसी भी मदद की सराहना की :)

+0

आप अनुक्रम आप 'Select', नहीं' Aggregate' का उपयोग करना चाहिए में प्रत्येक आइटम के लिए एक नया मूल्य की गणना करना चाहते हैं। जब आप उस काम के लिए सही संचालन का उपयोग करते हैं जिसे आप करने का प्रयास कर रहे हैं तो आप पाएंगे कि सिस्टम इसे और अधिक प्रभावी ढंग से पूरा करने में सक्षम होगा। – Servy

+0

आपके संग्रह में आपके पास कितनी आइटम हैं? (केवल 44?) आपके पास कितने सीपीयू कोर हैं? चूंकि कई ट्रेडों पर क्वेरी चलाने और CPU कोर को गुणा करने के लिए जटिल तैयारी की आवश्यकता होती है। संग्रह को कई हिस्सों में विभाजित किया जाना चाहिए जितना सीपीयू कोर उपलब्ध है, कार्यों को थ्रेड पर चलाएं, और आखिरकार परिणामों को एकत्र करें। तो .NET स्मार्ट सब कुछ बहुत धीमा बनाने के लिए बहुत काम नहीं करने के लिए पर्याप्त स्मार्ट ... – Major

+0

@Major I में 22000 स्ट्रिंग हैं, जिन्हें 500s में बैच किया गया है, 44 सूची एस दे रही है। मैं एक साथ पांच प्रक्रियाओं को चलाने के लिए सीमित हूं – Szmagpie

उत्तर

2

Aggregate का अधिभार आप वास्तव में पी में नहीं चलेंगे डिजाइन द्वारा समांतर। आप बीज पास करते हैं, फिर चरण कार्य करते हैं, लेकिन चरण फ़ंक्शन (agg) पर तर्क संचयक होता है जिसे पिछले चरण से प्राप्त किया गया था। इसी कारण से, यह मूल रूप से अनुक्रमिक है (पिछले चरण का परिणाम अगले चरण में इनपुट है) और समांतर नहीं है। सुनिश्चित नहीं है कि यह अधिभार ParallelEnumerable में क्यों शामिल है, लेकिन शायद एक कारण था।

इसके बजाय, एक और अधिभार का उपयोग करें:

var result = processMe 
.AsParallel() 
.Aggregate 
(
    // seed factory. Each partition will call this to get its own seed 
    () => new ConcurrentStack<ProcessResult>(), 
    // process element and update accumulator 
    (agg, input) => 
    {           
     var res = Process(input); 
     agg.Push(res); 
     return agg; 
    }, 
    // combine accumulators from different partitions 
    (agg1, agg2) => { 
     agg1.PushRange(agg2.ToArray()); 
     return agg1; 
    }, 
    // reduce 
    agg => 
    { 
     ProcessResult res; 
     agg.TryPop(out res); 
     return res; 
    } 
); 
संबंधित मुद्दे