2017-02-27 18 views
8

मेरे पास MyReader है जो Iterator लागू करता है और Buffer एस Buffer : Send बनाता है। MyReader बहुत सारे Buffer एस बहुत तेज़ी से उत्पन्न करता है, लेकिन मेरे पास प्रत्येक Buffer (.map(|buf| ...)) पर प्रदर्शन करने के लिए एक सीपीयू-गहन नौकरी है जो मेरी बाधा है, और उसके बाद परिणाम (आदेश दिया गया) इकट्ठा करें। मैं सीपीयू तीव्र काम को समानांतर करना चाहता हूं - उम्मीद है कि एन थ्रेड्स के लिए, जो कोर की संख्या जितनी जल्दी हो सके उन्हें निष्पादित करने के लिए काम चोरी का उपयोग करेगा।जंग में एक कस्टम, सिंगल थ्रेडेड इटरेटर पर समानांतर रूप से 'मानचित्र (...) `कैसे करें?

संपादित करें: अधिक सटीक होने के लिए। मैं rdedup पर काम कर रहा हूं। MyStructChunker है जो io::Read (आमतौर पर stdio) पढ़ता है, डेटा के हिस्सों (भाग) पाता है और उन्हें उपज देता है। map() प्रत्येक खंड के लिए, map(...) के परिणामस्वरूप पाइस्ट को sha256 पाचन की गणना करने, संपीड़ित करने, एन्क्रिप्ट करने, सहेजने और वापस करने के लिए माना जाता है। सहेजे गए डेटा का डाइजेस्ट डेटा के index बनाने के लिए उपयोग किया जाता है। map(...) द्वारा संसाधित किए जाने वाले हिस्सों के बीच का आदेश कोई फर्क नहीं पड़ता है, लेकिन प्रत्येक map(...) से लौटाया गया पाचन उसी क्रम में एकत्र किए जाने की आवश्यकता है जो भाग पाए गए थे। वास्तविक चरण save फ़ाइल चरण में अभी तक एक और थ्रेड (writter thread) तक ऑफ़लोड किया गया है। actual code of PR in question

मुझे आशा है कि मैं इसके लिए rayon का उपयोग कर सकता हूं, लेकिन rayon एक पुनरावर्तक की अपेक्षा करता है जो पहले से ही लंबित है - उदाहरण के लिए। Vec<...> या ऐसा कुछ। मुझे MyReader से par_iter प्राप्त करने का कोई तरीका नहीं मिला है - मेरा पाठक प्रकृति में बहुत एकल-थ्रेडेड है।

simple_parallel है लेकिन दस्तावेज़ीकरण कहता है कि सामान्य उपयोग के लिए इसकी अनुशंसा नहीं की जाती है। और मैं यह सुनिश्चित करना चाहता हूं कि सबकुछ सिर्फ काम करेगा।

मैं सिर्फ एक स्पैम कतार कार्यान्वयन और एक कस्टम thread_pool ले सकता था, लेकिन मैं अनुकूलित और परीक्षण किए गए मौजूदा समाधान के लिए रोक रहा था।

pipeliner भी है लेकिन अभी तक आदेशित मानचित्र का समर्थन नहीं करता है।

+0

हम कुछ चीजें यहाँ भूल रहे हैं? (1) क्या परिणाम की तरह (अपने कार्य stdout पर कुछ फेंकना है होना चाहिए एक फ़ाइल में लिखने लौट कुछ) और (2) निष्पादन के मामले का आदेश है या नहीं? –

+1

यदि इटेटरेटर सामग्री का उत्पादन करने में तेज़ी से है, लेकिन उन्हें संसाधित करने में समय लगता है, तो एक उचित कामकाज तत्वों को छोटे वैक्टरों में एकत्र करना और रेयन के 'par_iter()' के साथ प्रक्रिया करना है। यह बेहतर होगा अगर रेयन ने इसे मूल रूप से समर्थन दिया, हालांकि, स्ट्रीम से आने वाली वस्तुओं की समानांतर प्रक्रिया अक्सर एक आवश्यक विशेषता है। – user4815162342

+0

@ उपयोगकर्ता 4815162342: हम्म ... दिलचस्प है, लेकिन ऐसा लगता है कि यह थ्रेड पढ़ने पर अनावश्यक देरी करेगा, जो प्रत्येक बैच को पूरा करने के लिए होगा, और मैं वास्तव में पूर्ण CPU उपयोग की तलाश में हूं। –

उत्तर

5

सामान्य रूप से, समानांतरता के अनुसार, संरक्षण आदेश एक बहुत ही कठिन आवश्यकता है।

आप इसे एक ठेठ प्रशंसक बाहर/प्रशंसक में सेटअप के साथ हाथ से बनाने की कोशिश कर सकते हैं:

  • एक भी निर्माता जो एक अनुक्रमिक होगा- आईडी में वृद्धि के साथ आदानों टैग किए जाने
  • एक धागा पूल जो इस निर्माता से की खपत और फिर अंतिम उपभोक्ता की ओर परिणाम भेजता है,
  • एक उपभोक्ता जो बफ़र्स और पुन: क्रमित परिणाम इतनी के रूप में उन्हें क्रमानुसार के इलाज के लिए।

या आप अमूर्त स्तर को बढ़ा सकता है।


विशेष रुचि यहाँ के

: Future

एक Future एक गणना है, फिर भी हुआ जो हो सकता है या नहीं किया है का परिणाम प्रतिनिधित्व करता है। Future के एक आदेश दिया सूची प्राप्त एक उपभोक्ता बस हर एक पर प्रतीक्षा करें, और बफरिंग कतार में स्वाभाविक रूप से घटित कर सकते हैं।

बोनस अंक के लिए, यदि आप एक निश्चित आकार कतार का उपयोग करते हैं, तो आप उपभोक्ता पर स्वचालित रूप से बैक-प्रेशर प्राप्त करते हैं।


और इसलिए मैं CpuPool का निर्माण करने की सिफारिश करता हूं।

सेटअप होने जा रहा है:

use std::sync::mpsc::{Receiver, Sender}; 

fn produce(sender: Sender<...>) { 
    let pool = CpuPool::new_num_cpus(); 

    for chunk in reader { 
     let future = pool.spawn_fn(|| /* do work */); 
     sender.send(future); 
    } 

    // Dropping the sender signals there's no more work to consumer 
} 

fn consume(receiver: Receiver<...>) { 
    while let Ok(future) = receiver.recv() { 
     let item = future.wait().expect("Computation Error?"); 

     /* do something with item */ 
    } 
} 

fn main() { 
    let (sender, receiver) = std::sync::mpsc::channel(); 

    std::thread::spawn(move || consume(receiver)); 

    produce(sender); 
} 
+0

मैं एमएमपीसी चैनल के माध्यम से नंबर के साथ टैग की गई थ्रेड-पूल नौकरियों के साथ गया, और दूसरे चैनल के माध्यम से परिणाम भेज रहा था। वेक्टर को परिणाम एकत्र किए जाते हैं जिन्हें नौकरी संख्या द्वारा क्रमबद्ध किया जाता है। मैं इससे पूरी तरह से संतुष्ट नहीं हूं, लेकिन यह अभी के लिए करेगा। –

संबंधित मुद्दे