2016-03-02 7 views
6

मैं एक साधारण लघु सर्वेक्षण के लिए आरएक्सजेएस का उपयोग करने की कोशिश कर रहा हूं। सर्वर पर path स्थान पर प्रत्येक delay सेकंड में एक बार अनुरोध करने की आवश्यकता होती है, एक बार दो शर्तों में से एक बार समाप्त हो जाता है: या तो कॉलबैक isComplete(data) सत्य लौटाता है या इसने maxTries से अधिक सर्वर को आजमाया है। यहाँ बुनियादी कोड है:आरएक्सजेएस 5.0 "ऐसा करते समय" तंत्र की तरह

newShortPoll(path, maxTries, delay, isComplete) { 
    return Observable.interval(delay) 
    .take(maxTries) 
    .flatMap((tryNumber) => http.get(path)) 
    .doWhile((data) => !isComplete(data)); 
    } 

हालांकि, doWhile RxJS 5.0 में मौजूद नहीं है, तो ऐसी स्थिति है जिसमें यह केवल सर्वर ले() कॉल करने के लिए maxTries काम करता है, धन्यवाद कोशिश कर सकते हैं, लेकिन isComplete हालत नहीं है काम। मैं इसे कैसे बना सकता हूं ताकि अवलोकन योग्य() मानों तक पूरा हो जाए, पूर्ण पूर्ण सत्य लौटाएगा, जिस बिंदु पर यह अगले() मूल्य और पूर्ण() होगा।

मुझे ध्यान रखना चाहिए कि takeWhile() मेरे लिए यहां काम नहीं करता है। यह अंतिम मूल्य वापस नहीं करता है, जो वास्तव में सबसे महत्वपूर्ण है, क्योंकि जब हम जानते हैं कि यह किया गया है।

धन्यवाद!

+0

संभवतः का डुप्लिकेट: http://stackoverflow.com/questions/36007911/rxjs-poll-until-interval-done-or-correct-data-received –

+0

यह एक नहीं है इस अर्थ में डुप्लिकेट करें कि प्रश्न 'doWhile' के प्रतिस्थापन के लिए पूछ रहा है। –

उत्तर

0

हम एक दूसरा अवलोकन करने योग्य बनाने के लिए एक उपयोगिता फ़ंक्शन बना सकते हैं जो आंतरिक ऑब्जर्वेबल उत्सर्जित हर आइटम को उत्सर्जित करता है; तथापि, हम एक बार हमारे स्थिति उत्पन्न होने पर onCompleted समारोह कॉल करेगा: यहां

function takeUntilInclusive(inner$, predicate) { 
    return Rx.Observable.create(observer => { 
     var subscription = inner$.subscribe(item => { 
      observer.onNext(item); 

      if (predicate(item)) { 
       observer.onCompleted(); 
      } 
     }, observer.onError, observer.onCompleted); 


     return() => { 
      subscription.dispose(); 
     } 
    }); 
} 

और हमारी नई उपयोगिता पद्धति का उपयोग करके एक त्वरित टुकड़ा है:

const inner$ = Rx.Observable.range(0, 4); 
const data$ = takeUntilInclusive(inner$, (x) => x > 2); 
data$.subscribe(x => console.log(x)); 

// >> 0 
// >> 1 
// >> 2 
// >> 3 

इस उत्तर पर आधारित है: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

+0

विचित्र रूप से, यह समाधान लगभग आधा समय काम करता प्रतीत होता है। कभी-कभी डेटा का अंतिम बैच इसे ग्राहकों को बनाता है, कभी-कभी यह नहीं करता है। कोई विचार क्यों? स्विचमैप पर स्विच करने से इसे ठीक नहीं किया गया। –

0

आप retry और first ऑपरेटरों का उपयोग कर इसे प्राप्त कर सकते हैं।

// helper observable that can return incomplete/complete data or fail. 
 
var server = Rx.Observable.create(function (observer) { 
 
    var x = Math.random(); 
 

 
    if(x < 0.1) { 
 
    observer.next(true); 
 
    } else if (x < 0.5) { 
 
    observer.error("error"); 
 
    } else { 
 
    observer.next(false); 
 
    } 
 
    observer.complete(); 
 

 
    return function() { 
 
    }; 
 
}); 
 
    
 
function isComplete(data) { 
 
    return data; 
 
} 
 
    
 
var delay = 1000; 
 
Rx.Observable.interval(delay) 
 
    .switchMap(() => { 
 
    return server 
 
     .do((data) => { 
 
     console.log('Server returned ' + data); 
 
     },() => { 
 
     console.log('Server threw'); 
 
     }) 
 
     .retry(3); 
 
    }) 
 
    .first((data) => isComplete(data)) 
 
    .subscribe(() => { 
 
    console.log('Got completed value'); 
 
    },() => { 
 
    console.log('Got error'); 
 
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

संबंधित मुद्दे