2009-09-30 21 views
8
 
(fileNameToCharStream "bigfile" 
|>> fuse [length; 
      splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length; 
      splitBy (fun x -> x = '\n') keepEmpty |>> length; 
     ]) 
    (*fuse "fuses" the three functions to run concurrently*) 
|> run 2 (*forces to run in parallel on two threads*) 
|> (fun [num_chars; num_words; num_lines] -> 
     printfn "%d %d %d" 
      num_chars num_words, num_lines)) 

मैं इस कोड को निम्न तरीके से काम करना चाहता हूं: मूल धारा को बीच में दो में विभाजित करना; तो प्रत्येक छमाही के लिए एक अलग गणना चलाते हैं कि 3 चीजों की गणना करता है: लंबाई (यानी वर्णों की संख्या), शब्दों की संख्या, रेखाओं की संख्या। हालांकि, अगर में कोई समस्या नहीं है तो मैं गलती से एक शब्द को विभाजित करता हूं। यह की देखभाल की जानी चाहिए। फ़ाइल केवल एक बार पढ़ा जाना चाहिए।समांतर पाइपलाइनिंग

मुझे निर्दिष्ट कार्यों और ऑपरेटर को कैसे प्रोग्राम करना चाहिए | >>? क्या यह संभव है?

+0

हो सकता है कि अमेरिका अभी तक उठ नहीं है, लेकिन लंबित, आप कीवर्ड 'async' एक पाने के लिए देखने के लिए चाहते हो सकता है क्या संभव है के बारे में बेहतर विचार। – Benjol

+0

आप किस हस्ताक्षर की कल्पना करते हैं, फ्यूज, रन, और | >> होगा? उदाहरण के लिए, तीन तत्वों की आपकी सूची 3-टुपल में कहां बदल जाती है? – Gabriel

+0

ठीक है, मेरा मतलब है: |> (मजेदार [num_chars; num_words; num_lines] -> –

उत्तर

8

ऐसा लगता है कि आप काफी कुछ मांग रहे हैं। स्ट्रिंग मैनिपुलेशन को समझने के लिए मैं इसे आपके पास छोड़ दूंगा, लेकिन मैं आपको दिखाऊंगा कि एक ऑपरेटर को कैसे परिभाषित किया जाए जो समानांतर में संचालन की श्रृंखला निष्पादित करता है।

चरण 1: लिखें fuse समारोह

आपका फ्यूज समारोह अनेक कार्यों का उपयोग करते हुए एक भी इनपुट मैप करने के लिए प्रकट होता है, जो काफी आसान के रूप में लिखने के लिए इस प्रकार है:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list 
let fuse functionList input = [ for f in functionList -> f input] 

नोट के सभी कि आपके मानचित्रण कार्यों को एक ही प्रकार की आवश्यकता है।

चरण 2: के रूप में इस मानक समानांतर नक्शा समारोह लिखा जा सकता है समानांतर

में कार्य निष्पादित करने के लिए ऑपरेटर को परिभाषित करें:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array 
let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

मेरी जानकारी के लिए, Async.Parallel समानांतर में async संचालन निष्पादित करेगा, जहां किसी भी समय निष्पादित समांतर कार्यों की संख्या मशीन पर कोर की संख्या के बराबर होती है (अगर मैं गलत हूं तो कोई मुझे सही कर सकता है)। तो एक दोहरी कोर मशीन पर, जब हमारे फ़ंक्शन को कॉल किया जाता है, तो हमारे मशीन पर चलने वाले 2 थ्रेड चलने चाहिए। यह एक अच्छी बात है, क्योंकि हम प्रति कोर एक से अधिक थ्रेड चलाकर किसी भी गति से अपेक्षा नहीं करते हैं (वास्तव में अतिरिक्त संदर्भ स्विचिंग चीजों को धीमा कर सकती है)।

हम pmap और fuse के मामले में एक ऑपरेटर |>> परिभाषित कर सकते हैं:

//val (|>>) : seq<'a> -> seq<('a -> 'b)> -> 'b list array 
let (|>>) input functionList = pmap (fuse functionList) input 

तो |>> ऑपरेटर आदानों की एक गुच्छा ले जाता है और नक्शे उन्हें अलग outputs के बहुत सारे के प्रयोग से। अब तक, अगर हम यह सब एक साथ रखा है, हम निम्नलिखित (FSI में) मिलता है:

> let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];; 

val countOccurrences : 'a -> seq<'a> -> int 
val length : string -> int 
val testData : string [] = 
    [|"Juliet is awesome"; "Someone should give her a medal"|] 
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|] 

testOutput दो तत्वों, जो दोनों के समानांतर में अभिकलन शामिल होते हैं।

चरण 3: सकल एक भी उत्पादन

ठीक है में तत्वों, इसलिए अब हम हमारे सरणी में प्रत्येक तत्व का प्रतिनिधित्व करती आंशिक परिणाम है, और हम एक भी कुल में हमारे आंशिक परिणामों मर्ज करना चाहते हैं। मुझे लगता है कि सरणी में प्रत्येक तत्व को एक ही फ़ंक्शन विलय किया जाना चाहिए, क्योंकि इनपुट में प्रत्येक तत्व में एक ही डेटाटाइप होता है।

यहाँ एक बहुत बदसूरत समारोह मैं इस काम के लिए लिखा है:

> let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);; 

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list 

> reduceMany (+) testOutput;; 
val it : int list = [48; 1; 4] 

reduceMany एन-लंबाई दृश्यों के अनुक्रम लेता है, और यह एक आउटपुट के रूप में एक n लंबाई सरणी देता है। आप एक बेहतर तरीका यह समारोह लिखने के बारे में सोच सकते हैं, तो मेरी अतिथि :) हो

ऊपर उत्पादन को डिकोड करने के लिए:

  • 48 = मेरी दो इनपुट तार की लंबाई की राशि। ध्यान दें कि मूल स्ट्रिंग 49 वर्ण थी, लेकिन इसे "|" पर विभाजित करना एक char प्रति "|" खा लिया।
  • 1 = मेरे इनपुट में 'जे' के सभी उदाहरणों का योग
  • 4 = 'ओ' के सभी उदाहरणों का योग।

चरण 4: रखो सब कुछ एक साथ

let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

let fuse functionList input = [ for f in functionList -> f input] 

let (|>>) input functionList = pmap (fuse functionList) input 

let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]) 

let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'] 
    |> reduceMany (+)