RxJs

2014-12-16 31 views
7

के साथ एक अवलोकन धारा में पृष्ठबद्ध अनुरोधों को चालू करना मेरे पास एक सेवा है जो पृष्ठों में डेटा लौटाती है। एक पृष्ठ की प्रतिक्रिया में अगले पृष्ठ के लिए पूछताछ के बारे में विवरण शामिल हैं।RxJs

मेरा दृष्टिकोण प्रतिक्रिया डेटा वापस करना है और फिर अधिक पृष्ठों को उपलब्ध होने पर तत्काल अनुक्रमित अनुक्रम में एक स्थगित कॉल को सम्मिलित करना है।

function getPageFromServer(index) { 
    // return dummy data for testcase 
    return {nextpage:index+1, data:[1,2,3]}; 
} 

function getPagedItems(index) { 
    return Observable.return(getPageFromServer(index)) 
    .flatMap(function(response) { 
     if (response.nextpage !== null) { 
     return Observable.fromArray(response.data) 
      .concat(Observable.defer(function() {return getPagedItems(response.nextpage);})); 
     } 

     return Observable.fromArray(response.data); 
    }); 
} 

getPagedItems(0).subscribe(
    function(item) { 
    console.log(new Date(), item); 
    }, 
    function(error) { 
    console.log(error); 
    } 
) 

यह गलत दृष्टिकोण होना चाहिए, क्योंकि 2 सेकंड के भीतर आपको मिलेगा:

 throw e; 
      ^
RangeError: Maximum call stack size exceeded 
    at CompositeDisposablePrototype.dispose (/Users/me/node_modules/rx/dist/rx.all.js:654:51) 

सही दृष्टिकोण पर अंक लगाना करने के लिए क्या है?

उत्तर

4

संपादित करें आह! मैं जिस समस्या का सामना कर रहा हूं उसे देखता हूं। पूंछ कॉल ऑप्टिमाइज़ेशन का थोड़ा सा आपको ठीक करना चाहिए:

function mockGetPageAjaxCall(index) { 
    // return dummy data for testcase 
    return Promise.resolve({nextpage:index+1, data:[1,2,3]}); 
} 

function getPageFromServer(index) { 
    return Observable.create(function(obs) { 
    mockGetPageAjaxCall(index).then(function(page) { 
     obs.onNext(page); 
    }).catch(function(err) { 
     obs.onError(err) 
    }).finally(function() { 
     obs.onCompleted(); 
    }); 
    }); 
} 

function getPagedItems(index) { 
    return Observable.create(function(obs) { 
     // create a delegate to do the work 
     var disposable = new SerialDisposable(); 
     var recur = function(index) { 
      disposable.setDisposable(getPageFromServer(index).retry().subscribe(function(page) { 
       obs.onNext(page.items); 
       if(page.nextpage === null) { 
        obs.onCompleted(); 
       } 

       // call the delegate recursively 
       recur(page.nextpage); 
      })); 
     }; 

     // call the delegate to start it 
     recur(0); 

     return disposable; 
    }); 
} 

getPagedItems(0).subscribe(
    function(item) { 
    console.log(new Date(), item); 
    }, 
    function(error) { 
    console.log(error); 
    } 
) 
+0

लायक जब मैं यह परीक्षण (सबसे बाहर निकलने के पथ पर 10 पृष्ठों के बिना) है, यह 1380 आइटम रिटर्न और उसके बाद बंद हो जाता है। यह कैसे रुक सकता है? – RichB

+0

मैंने इस कोड के साथ एक JSFiddle बनाया है। http://jsfiddle.net/1wpcmuke/ यह अभी भी अधिकतम कॉल स्टैक आकार से अधिक हो गया है। – RichB

+0

आप अभी भी अधिकतम कॉल स्टैक से अधिक हो रहे हैं। लगभग 430 पेज लौटे। मुझे लगता है कि रिकर्सन यहां सबसे अच्छा समाधान नहीं हो सकता है। –

7

ओपी कोड को देखते हुए, यह वास्तव में सही तरीका है। वास्तविक परिस्थितियों को अनुकरण करने के लिए बस अपनी नकली सेवा को असीमित बनाने की आवश्यकता है। यहां एक समाधान है जो स्टैक थकावट से बचाता है (मैंने getPageFromServer भी वास्तव में कॉलर को लपेटने की आवश्यकता के बजाय एक ठंडे देखने योग्य लौटा दिया है)।

ध्यान दें कि यदि आप वास्तव में वास्तविक सेवा में आपके सेवा अनुरोधों को सिंक्रनाइज़ करने की अपेक्षा करते हैं और इस प्रकार आपको यह सुनिश्चित करने की आवश्यकता है कि आपका कोड स्टैक को समाप्त नहीं करता है, तो बस getPagedItems()currentThread scheduler का आह्वान करें। currentThread पुनरावर्ती कॉल (और थकावट थकावट) को रोकने के लिए ट्राम्पोलिन का उपयोग करके शेड्यूलर शेड्यूल कार्य करता है। getPagedItems

function getPageFromServer(index) { 
    // return dummy data asynchronously for testcase 
    // use timeout scheduler to force the result to be asynchronous like 
    // it would be for a real service request 
    return Rx.Observable.return({nextpage: index + 1, data: [1,2,3]}, Rx.Scheduler.timeout); 

    // for real request, if you are using jQuery, just use rxjs-jquery and return: 
    //return Rx.Observable.defer(function() { return $.ajaxAsObservable(...); }); 
} 

function getPagedItems(index) { 
    var result = getPageFromServer(index) 
     .retry(3) // retry the call if it fails 
     .flatMap(function (response) { 
      var result = Rx.Observable.fromArray(response.data); 
      if (response.nextpage !== null) { 
       result = result.concat(getPagedItems(response.nextpage)); 
      } 
      return result; 
     }); 

    // If you think you will really satisfy requests synchronously, then instead 
    // of using the Rx.Scheduler.timeout in getPageFromServer(), you can 
    // use the currentThreadScheduler here to prevent the stack exhaustion... 

    // return result.observeOn(Rx.Scheduler.currentThread) 
    return result; 
} 
+0

मैंने इसके लिए एक jsbin किया: https://jsbin.com/zaqoqedula/edit?js,console उत्तर के लिए धन्यवाद, बहुत उपयोगी। – Roaders

+0

इस जेएसबीएन पर आउटपुट मेरे लिए अनुसरण करना आसान है: https://jsbin.com/neniko/edit?js,console – bloodyKnuckles

2

यहाँ के अंत में लाइन पर टिप्पणी देखें किसी भी प्रत्यावर्तन के बिना अधिक संक्षिप्त & IMHO साफ जवाब है। यह किसी भी जनरेटर को एक अवलोकन में बदलने के लिए ogen (~ 46 लोक) का उपयोग कर रहा है।

इसमें एक कस्टम निर्मित अगला फ़ंक्शन है जो आपके फ़ंक्शन को किसी भी समय उत्पन्न करने पर डेटा उत्सर्जित करेगा।

nb: original article पढ़ने

function getPagedItems({offset=0, limit=4}) { 
    paginatedQueryGenerator = function*(someParams offset, limit) { 
     let hasMore = true 
     while(hasMore) { 
      const results = yield YOUR_PROMISE_BASED_REQUEST(someParams, limit, offset) 
      hasMore = results && results.nextpage !== null 
      offset += limit 
     } 
    } 

    return ogen(paginatedQueryGenerator)(someParams, offset, limit) 
} 
संबंधित मुद्दे