2010-09-19 11 views
5

मैं हास्केल शुरुआती हूं और सोचा कि यह अच्छा अभ्यास होगा। मैं एक काम जहाँ मैं, एक धागा एक में फ़ाइल को पढ़ने B_i धागे में फ़ाइल लाइनों संभालने की ज़रूरत है, और फिर उत्पादन धागा सी में परिणामफ़ाइलों को पढ़ने के दौरान स्मृति उपयोग सीमित करना

मैं इतनी दूर पहले से ही लागू कर दिया है, लेकिन आवश्यकताओं में से एक है यह है कि हम भरोसा नहीं कर सकते कि पूरी फ़ाइल स्मृति में फिट बैठती है। मैं उम्मीद कर रहा था कि आलसी आईओ और कचरा कलेक्टर मेरे लिए यह करेगा, लेकिन स्मृति उपयोग बढ़ता और बढ़ता रहता है।

पाठक धागा (ए) readFile के साथ फ़ाइल को पढ़ता है जिसे तब को लाइन संख्याओं के साथ ज़िपित किया जाता है और बस में लपेटा जाता है। ये ज़िपित लाइनें से Control.Concurrent.Chan पर लिखी जाती हैं। प्रत्येक उपभोक्ता थ्रेड बी का अपना चैनल होता है।

प्रत्येक उपभोक्ता डेटा के समय अपने स्वयं के चैनल को पढ़ता है और यदि रेगेक्स मैचों, तो यह संभवतः (सूचियों से बने) के भीतर लिपटे अपने संबंधित आउटपुट चैनल पर आउटपुट किया जाता है।

प्रिंटर प्रत्येक बी थ्रेड के आउटपुट चैनल की जांच करता है। यदि में से कोई भी परिणाम (रेखा) कुछ भी नहीं है, तो रेखा मुद्रित होती है। चूंकि इस बिंदु पर पुरानी लाइनों का कोई संदर्भ नहीं होना चाहिए, मैंने सोचा कि कचरा कलेक्टर इन पंक्तियों को जारी करने में सक्षम होगा, लेकिन हां, मैं में गलत हूं।

.lhs फ़ाइल यहाँ है: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

तो सवाल है, मैं कैसे स्मृति के उपयोग की सीमा है, या कचरा कलेक्टर लाइनों दूर करने की अनुमति है।

अनुरोध के अनुसार स्निपेट्स। उम्मीद है कि इंडेंट भी बुरी तरह से नष्ट नहीं है :)

data Global = Global {done :: MVar Bool, consumers :: Consumers} 
type Done = Bool 
type Linenum = Int 
type Line = (Linenum, Maybe String) 
type Output = MVar [Line] 
type Input = Chan Line 
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output))) 
type State a = ReaderT Global IO a 


producer :: [Input] -> FilePath -> State() 
producer c p = do 
    liftIO $ Main.log "Starting producer" 
    d <- asks done 
    f <- liftIO $ readFile p 
    mapM_ (\l -> mapM_ 
    (liftIO . flip writeChan l) c) 
    $ zip [1..] $ map Just $ lines f 
    liftIO $ modifyMVar_ d (return . not) 

printer :: State() 
printer = do 
    liftIO $ Main.log "Starting printer" 
    c <- (fmap (map (snd . snd) . M.elems) 
    (asks consumers >>= liftIO . readMVar)) 
    uniq' c 
    where head' :: Output -> IO Line 
    head' ch = fmap head (readMVar ch) 

    tail' = mapM_ (liftIO . flip modifyMVar_ 
     (return . tail)) 

    cont ch = tail' ch >> uniq' ch 

    printMsg ch = readMVar (head ch) >>= 
     liftIO . putStrLn . fromJust . snd . head 

    cempty :: [Output] -> IO Bool 
    cempty ch = fmap (any id) 
     (mapM (fmap ((==) 0 . length) . readMVar) ch) 

    {- Return false unless none are Nothing -} 
    uniq :: [Output] -> IO Bool 
    uniq ch = fmap (any id . map (isNothing . snd)) 
     (mapM (liftIO . head') ch) 

    uniq' :: [Output] -> State() 
    uniq' ch = do 
     d <- consumersDone 
     e <- liftIO $ cempty ch 
     if not e 
     then do 
      u <- liftIO $ uniq ch 
      if u then cont ch else do 
     liftIO $ printMsg ch 
     cont ch 
      else unless d $ uniq' ch 

उत्तर

6

समवर्ती प्रोग्रामिंग कोई परिभाषित निष्पादन आदेश जब तक आप एक अपने आप को mvars और पसंद के साथ लागू प्रदान करता है। तो इसकी संभावना है कि निर्माता थ्रेड चैन में सभी लाइनों को चिपकाने से पहले किसी भी उपभोक्ता को पढ़ता है और उन्हें पास करता है। एक अन्य आर्किटेक्चर जो आवश्यकताओं को फिट करना चाहिए, केवल थ्रेड ए आलसी रीडफाइल को कॉल करें और परिणाम को एक एमवीआर में चिपकाएं। फिर प्रत्येक उपभोक्ता धागा mvar लेता है, एक रेखा पढ़ता है, फिर रेखा को संभालने के लिए आगे बढ़ने से पहले mvar को प्रतिस्थापित करता है। फिर भी, अगर आउटपुट थ्रेड जारी नहीं रह सकता है, तो चैन पर संग्रहीत मिलान लाइनों की संख्या मनमाने ढंग से निर्माण कर सकती है।

आपके पास एक पुश आर्किटेक्चर है। वास्तव में इसे स्थिर स्थान पर काम करने के लिए, मांग की मांग के संदर्भ में सोचें। एक तंत्र ढूंढें जैसे आउटपुट थ्रेड प्रोसेसिंग थ्रेड को सिग्नल करता है कि उन्हें कुछ करना चाहिए, और ऐसा कि प्रोसेसिंग थ्रेड रीडर थ्रेड को सिग्नल करते हैं कि उन्हें कुछ करना चाहिए।

ऐसा करने का एक और तरीका है इसके बजाय सीमित आकार के चैन होने के लिए - इसलिए पाठक थ्रेड ब्लॉक करता है जब प्रोसेसर धागे पकड़े नहीं जाते हैं, और जब आउटपुट थ्रेड पकड़ा नहीं जाता है तो प्रोसेसर धागे अवरुद्ध होते हैं।

पूरी तरह से, समस्या वास्तव में मुझे टिम ब्र के वाइडफाइंडर बेंचमार्क की याद दिलाती है, हालांकि आवश्यकताएं कुछ अलग हैं। किसी भी मामले में, इसने मल्टीकोर grep को लागू करने के सर्वोत्तम तरीके पर व्यापक चर्चा की। बड़ी पंचलाइन यह थी कि समस्या आईओ बाध्य है, और आप mmapped फ़ाइलों पर एकाधिक पाठक धागे चाहते हैं।

यहाँ और अधिक के लिए देखें तुलना में आप कभी भी पता करने के लिए चाहता हूँ: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

+4

BoundedChan वास्तव में उपयोग की इस प्रकार के लिए hackage पर है। –

+0

धन्यवाद टॉम और sciv। मैं इसे लागू करने का प्रयास करूंगा और अगर यह काम करता है तो उत्तर के रूप में चिह्नित होगा – Masse

संबंधित मुद्दे