2016-08-20 16 views
5

मुझे लगता है कि यह हासिल करने के लिए कुछ आसान होना चाहिए लेकिन मुझे परेशानी हो रही है (अवधारणात्मक रूप से, मुझे लगता है) यह पता लगाने के लिए कि इसका सामना कैसे किया जाए।आरएक्सजेएस का उपयोग कर बैचिंग?

मेरे पास एक एपीआई है जो JSON ऑब्जेक्ट्स की एक सरणी देता है। मुझे इन ऑब्जेक्ट्स के माध्यम से कदम उठाने की ज़रूरत है, और, प्रत्येक ऑब्जेक्ट के लिए, एक और AJAX कॉल करें। मुद्दा यह है कि प्रत्येक AJAX कॉल को संभालने वाली प्रणाली एक समय में केवल दो सक्रिय कॉलों को संभाल सकती है (क्योंकि यह एक सीपीयू-गहन कार्य है जो डेस्कटॉप एप्लिकेशन में हुक करता है)।

मैं सोच रहा था कि मैं आरएक्सजेएस (या तो संस्करण 5 या 4 का उपयोग कर) का उपयोग करके इसे कैसे प्राप्त कर सकता हूं?

संपादित करें: इसके अतिरिक्त, एक साथ चलने वाले चरणों की एक श्रृंखला होना संभव है।

Rx.Observable.fromPromise(start()) 
    .concatMap(arr => Rx.Observable.from(arr)) 
    .concatMap(x => downloadFile(x)) 
    .concatMap((entry) => processFile(entry)) 
    .concatMap((entry) => convertFile(entry)) 
    .concatMap((entry) => UploadFile(entry)) 
    .subscribe(
     data => console.log('data', new Date().getTime(), data), 
     error => logger.warn('err', error), 
     complete => logger.info('complete') 
    ); 

हालांकि कि काम करने के लिए प्रतीत नहीं होता: अर्थात

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

मैं की तरह कुछ कर रही कोशिश की है। DownloadFile, उदाहरण के लिए प्रक्रिया के लिए इंतजार नहीं करता हैफाइल, कन्वर्टफाइल और अपलोड सभी को पूरा करने के लिए फ़ाइल, बल्कि, अगले एक को पूरा होने के बाद अगली बार फिर से चलाया जाएगा।

+0

http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/subscribe.html – cport1

+0

@ cport1 - लिंक्ड उदाहरण एसिंक ऑपरेशन के किसी भी रूप की प्रतीक्षा किए बिना एक के बाद एक चलाता है। मैं जो पूछ रहा हूं उससे अलग है। – NRaf

उत्तर

4

यहाँ, 2 दृष्टिकोण हैं आप के अंदर सभी वादों को हल करने की जरूरत है एकल concatMap विधि, इस

Rx.Observable.fromPromise(getJSONOfAjaxRequests()) 
    .flatMap(function(x) { return x;}) 
    .concatMap(function(item) { 
    return downloadFile(item) 
     .then(processFile) 
     .then(convertFile); 
    }) 
    .subscribe(function(data) { 
    console.log(data); 
    }); 

यहां काम करने वाले प्लंकर को देखें: https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview इस तरह, नया AJAX कॉल केवल तभी समाप्त हो जाएगा जब पिछला समाप्त हो जाएगा।

एक अन्य दृष्टिकोण यह है कि फाइलें समानांतर में अनुरोध भेजने की अनुमति देती हैं लेकिन ऑपरेशन 'डाउनलोडिंग, प्रोसेसिंग, कनवर्टिंग, अपलोडिंग' अनुक्रम में होगी।इसके लिए आपको यह

Rx.Observable.fromPromise(getJSONOfAjaxRequests()) 
    .flatMap(function(x) { return x;}) 
    .merge(2) // in case maximum concurrency required is 2 
    .concatMap(function(item) { 
    return downloadFile(item); 
    }) 
    .concatMap(function(item) { 
    return processFile(item); 
    }) 
    .concatMap(function(item) { 
    return convertFile(item) 
    }) 
    .subscribe(function(data) { 
    //console.log(data); 
    }); 

से काम कर plunkr यहाँ देख प्राप्त कर सकते हैं: https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

+0

धन्यवाद, पहला वह है जो मैंने साथ किया है, हालांकि दूसरा आदर्श होगा, अगर मैं समांतर संचालन की मात्रा सीमित कर सकता हूं (यानी एक समय में अधिकतम 2)। – NRaf

+0

वैसे, यह पहला समाधान केवल rxjs5 के साथ काम करता है। संस्करण 4 का उपयोग करने वाला व्यवहार उन्हें समानांतर में चलाता है। – NRaf

+0

अजीब! सैद्धांतिक रूप से व्यवहार rxjs4 और rxjs5 पर समान रहना चाहिए। – FarazShuja

1

इस तरह कुछ कैसे? आप from का उपयोग सरणी को काटने वाले टुकड़ों में तोड़ने के लिए कर सकते हैं और concatMap का उपयोग करके उन्हें एक-एक करके संसाधित कर सकते हैं।

function getArr() { 
    return Rx.Observable.of([1, 2, 3, 4, 5, 6, 7, 8]); 
} 


function processElement(element) { 
    return Rx.Observable.of(element) 
     .delay(500); 
} 


getArr() 
    .concatMap(arr => { 
     return Rx.Observable.from(arr); 
    }) 
    .concatMap(element => { 
     return processElement(element); 
    }) 
    .subscribe(res => { 
     console.log(res); 
    }); 
+0

यह आशाजनक लग रहा है, लेकिन क्या इसे विस्तारित करना संभव होगा? यानी अगर मैं एक और कदम भी जोड़ना चाहता था? – NRaf

+0

मेरा मानना ​​है कि आप इसे बढ़ा सकते हैं, हां। आप concatMaps या जो भी अन्य ऑपरेटर की आवश्यकता है उसे जोड़ना जारी रख सकते हैं। यहां बहुत अच्छी जानकारी है http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html – jcolemang

2

आप maxConcurrency अधिभार (Rxjs v4) के साथ merge ऑपरेटर इस्तेमाल कर सकते हैं, तो कुछ की तरह:

आधिकारिक प्रलेखन: यदि आप वास्तव में इस

Downloading File: 1 
Processing File: 1 
Converting File: 1 
Uploading File: 1 
Downloading File: 2 
Processing File: 2 
... 

तरह अनुरोधों के अनुक्रम चाहते

संबंधित मुद्दे