2014-09-30 14 views
5

मैं कई सैकड़ों लॉग फ़ाइलों को संसाधित करने के लिए एक स्क्रिप्ट लिखने के लिए आरएक्सजेएस का उपयोग करने की कोशिश कर रहा हूं, जिनमें से प्रत्येक लगभग 1 जीबी है। स्क्रिप्ट के कंकाल कोडflatMap की सहमति को सीमित कैसे करें?

तरह
Rx.Observable.from(arrayOfLogFilePath) 
.flatMap(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

काम करता है, लेकिन लगता है कि सभी लॉग फ़ाइलों के छानने कदम समवर्ती शुरू कर देंगे लग रहा है। हालांकि, फ़ाइल सिस्टम आईओ प्रदर्शन परिप्रेक्ष्य से, एक फ़ाइल के बाद एक फ़ाइल को संसाधित करना बेहतर होता है (या कम से कम एक ही समय में सभी सैकड़ों फाइलें खोलने के बजाय कुछ फ़ाइलों को समेकन सीमित करने के लिए)। इस संबंध में, मैं इसे "कार्यात्मक प्रतिक्रियाशील तरीके" में कैसे कार्यान्वित कर सकता हूं?

मैंने शेड्यूलर के बारे में सोचा था लेकिन यह पता नहीं लगा सकता कि यह कैसे मदद कर सकता है।

+0

मैं एक ही सवाल है, लेकिन Rx.NET साथ की है। क्या यह संभव है? http://stackoverflow.com/questions/37345516/limiting-concurrent-requests-using-rx-and-selectmany – SuperJMN

उत्तर

12

आप सहमति को सीमित करने के लिए .merge(maxConcurrent) का उपयोग कर सकते हैं। चूंकि .merge(maxConcurrent) एक मेटाबोर्सेबल (अवलोकन करने योग्यों के अवलोकन) को एक अवलोकन में फ़्लैट करता है, तो आपको .flatMap को .map के साथ प्रतिस्थापित करने की आवश्यकता है ताकि आउटपुट मेटाबोर्सेबल ("unflat") हो, तो आप .merge(maxConcurrent) पर कॉल करें।

Rx.Observable.from(arrayOfLogFilePath) 
.map(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

इस कोड का परीक्षण नहीं किया गया है (के बाद से मैं विकास के वातावरण आपके पास करने के लिए पहुँच नहीं है), लेकिन यह आगे बढ़ने के लिए कैसे है। आरएक्सजेएस में समवर्ती पैरामीटर वाले कई ऑपरेटर नहीं हैं, लेकिन आप लगभग .merge(maxConcurrent) के साथ जो भी चाहते हैं वह लगभग हमेशा कर सकते हैं।

+1

यह वही है जो मैं काम करने की कोशिश कर रहा हूं। मेरे पास लोड करने के लिए 500 यूआरएल की एक सूची है और एक ही समय में सभी अनुरोधों को बंद नहीं करना चाहते हैं। मैंने मानचित्र का उपयोग किया है (5) लेकिन यह काम नहीं कर रहा है ... सभी अनुरोध एक ही समय में किए जाते हैं। – Roaders

+0

@Roaders क्या आपको यह समाधान काम करने के लिए मिला? मैं वही कोशिश कर रहा हूँ। लेकिन, सभी अनुरोध एक ही समय में निकाल दिए जाते हैं। मैंने हर जगह गुमराह किया है और मुझे कुछ भी नहीं मिला है। – Diego

+0

यदि आप उदाहरण के लिए एसिंक http कॉल कर रहे हैं तो आपको इसे Rx.defer() में लपेटने की आवश्यकता है ताकि आरएक्स तय कर सके कि कॉल कब किया जाता है (और उदाहरण के लिए विफल होने पर इसे पुनः प्रयास करें) – Roaders

0

मैंने अभी आरएक्सजेएस 5 के साथ एक ही समस्या हल की है, इसलिए मुझे आशा है कि समाधान अन्य समस्याओं की मदद कर सकता है।

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more), 
 
// retry two times, push error on stream if retry fails. 
 

 
//const Rx = require('rxjs-es6/Rx'); 
 

 
// -- Global variabel just to show that it works. -- 
 
let parallelRequests = 0; 
 
// -------------------------------------------------- 
 

 
function simulateRequest(req) { 
 
    console.log("Request " + req); 
 
    // --- To log retries --- 
 
    var retry = 0; 
 
    // ---------------------- 
 

 
    // Can't retry a promise, need to restart before the promise is made. 
 
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => { 
 

 
     var random = Math.floor(Math.random() * 2000); 
 
     // -- To show that it works -- 
 
     if (retry) { 
 
      console.log("Retrying request " + req + " ,retry " + retry); 
 
     } else { 
 

 
      parallelRequests++; 
 
     } 
 
     // --------------------------- 
 
     setTimeout(() => { 
 
      if (random < 900) { 
 
       retry++; 
 
       return reject(req + " !!!FAILED!!!"); 
 
      } 
 

 
      return resolve(req); 
 
     }, random); 
 
    })).retry(2).catch(e => Rx.Observable.of(e)); 
 
} 
 

 
Rx.Observable.range(1, 10) 
 
    .flatMap(e => simulateRequest(e), null, 2) 
 
    // -- To show that it works -- 
 
    .do(() => { 
 
     console.log("ParallelRequests " + parallelRequests); 
 
     parallelRequests--; 
 
    }) 
 
    // --------------------------- 
 
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>

संबंधित मुद्दे