2017-01-17 9 views
6

के बीच विन्यास समय के बाद पुनरावृत्ति के कोडआरएक्स distinctUntilChanged घटनाओं

Rx.Observable.merge(
    Rx.Observable.just(1), 
    Rx.Observable.just(1).delay(1000) 
).distinctUntilChanged() 
    .subscribe(x => console.log(x)) 

हमें उम्मीद है कि 1 सिर्फ एक बार लॉग ऑन है निम्नलिखित एक पल के लिए विचार करते हैं अनुमति देते हैं। हालांकि, अगर हम एक मूल्य की पुनरावृत्ति की अनुमति देना चाहते हैं तो क्या इसका अंतिम उत्सर्जन समय पहले कॉन्फ़िगर करने योग्य था? मेरा मतलब है दोनों घटनाएं लॉग इन करें।

उदाहरण के लिए यह निम्नलिखित

Rx.Observable.merge(
    Rx.Observable.just(1), 
    Rx.Observable.just(1).delay(1000) 
).distinctUntilChanged(1000) 
    .subscribe(x => console.log(x)) 

जिसमें distinctUntilChanged() टाइमआउट किसी प्रकार स्वीकार करता है अगले तत्व पर पुनरावृत्ति की अनुमति के लिए की तरह कुछ है करने के लिए शांत हो जाएगा। हालांकि ऐसी चीज मौजूद नहीं है और मैं सोच रहा था कि अगर कोई फ़िल्टर के साथ गड़बड़ किए बिना उच्च स्तरीय ऑपरेटरों का उपयोग करके इसे प्राप्त करने के लिए कोई शानदार तरीका जानता है जिसके लिए राज्य

उत्तर

8

जब तक मुझे लगता है मैं गलत समझ रहा हूँ करने के लिए यकीन है कि इस windowTime साथ एक अपेक्षाकृत सीधी-सपाट ढंग से पूरा किया जा सकता हूँ:

Observable 
    .merge(
    Observable.of(1), 
    Observable.of(1).delay(250), // Ignored 
    Observable.of(1).delay(700), // Ignored 
    Observable.of(1).delay(2000), 
    Observable.of(1).delay(2200), //Ignored 
    Observable.of(2).delay(2300) 
) 
    // Converts the stream into a stream of streams each 1000 milliseconds long 
    .windowTime(1000) 
    // Flatten each of the streams and emit only the latest (there should only be one active 
    // at a time anyway 
    // We apply the distinctUntilChanged to the windows before flattening 
    .switchMap(source => source.distinctUntilChanged()) 
    .timeInterval() 
    .subscribe(
    value => console.log(value), 
    error => console.log('error: ' + error), 
    () => console.log('complete') 
); 

उदाहरण here (उधार @ मार्टिन की मिसाल आदानों)

+0

बहुत बढ़िया, समस्या का एक बहुत चालाक समाधान - स्वीकार्य जवाब होना चाहिए! – olsn

+1

विंडो ऑपरेटर के बारे में नहीं पता था। मैंने इस तरह कुछ ठोस तरीके से लागू किया था लेकिन यह वास्तव में इसे सरल बनाता है। धन्यवाद! हालांकि मैंने देखा कि यह दोहराए गए तत्वों को उत्सर्जित कर सकता है यदि वे खिड़की सीमा (उदा। 9 0 9 0 और 1010 एमएमएस) के पास होते हैं। मैंने इसे अंतिम विशिष्ट आइटम के सापेक्ष सीमा टाइमआउट बनाने के लिए बदल दिया: 'obs.window (obs.distinctUntilChanged()। स्विचमैप (एस => ऑब्जर्वेबल.timer (1000)))। संशोधित [उदाहरण] देखें (https://jsbin.com/soyovumeja/edit?js,console)। –

2

यह एक दिलचस्प उपयोग-मामला है। मुझे आश्चर्य है कि लगा सकते हैं कि मेरा की तुलना में एक आसान समाधान (ध्यान दें कि मैं RxJS 5 उपयोग कर रहा हूँ):

let timedDistinctUntil = Observable.defer(() => { 
    let innerObs = null; 
    let innerSubject = null; 
    let delaySub = null; 

    function tearDown() { 
     delaySub.unsubscribe(); 
     innerSubject.complete(); 
    } 

    return Observable 
     .merge(
      Observable.of(1), 
      Observable.of(1).delay(250), // ignored 
      Observable.of(1).delay(700), // ignored 
      Observable.of(1).delay(2000), 
      Observable.of(1).delay(2200), // ignored 
      Observable.of(2).delay(2300) 
     ) 
     .do(undefined, undefined,() => tearDown()) 
     .map(value => { 
      if (innerObs) { 
       innerSubject.next(value); 
       return null; 
      } 

      innerSubject = new BehaviorSubject(value); 

      delaySub = Observable.of(null).delay(1000).subscribe(() => { 
       innerObs = null; 
      }); 

      innerObs = innerSubject.distinctUntilChanged(); 
      return innerObs; 
     }) 
     // filter out all skipped Observable emissions 
     .filter(observable => observable) 
     .switch(); 
}); 

timedDistinctUntil 
    .timestamp() 
    .subscribe(
     value => console.log(value), 
     error => console.log('error: ' + error), 
     () => console.log('complete') 
    ); 

लाइव डेमो देखें: https://jsbin.com/sivuxo/5/edit?js,console

पूरे तर्क Observable.defer() स्थिर विधि में लपेटा जाता है, क्योंकि यह कुछ अतिरिक्त आवश्यकता है चर।

एक जोड़े को बताते हैं कि कैसे यह सब काम करता है:

  1. merge() आइटम का स्रोत है।

  2. मैं do() का उपयोग करता हूं ताकि स्रोत पूरा होने पर ठीक से पकड़ सकें ताकि मैं आंतरिक टाइमर को बंद कर सकूं और उचित पूर्ण अधिसूचना भेज सकूं।

  3. map() ऑपरेटर वह जगह है जहां सबसे दिलचस्प चीजें होती हैं। मैं प्राप्त मान को फिर से मानता हूं और फिर null लौटाता हूं यदि पहले से ही एक वैध पर्यवेक्षक है (यह 1000ms पहले = innerObs != null से कम बनाया गया था)। फिर मैं अंततः एक नया विषय तैयार करता हूं जहां मैं सभी वस्तुओं को फिर से लिखने जा रहा हूं और के साथ इस BehaviorSubject को वापस कर दूंगा। अंत में मैं innerObs = null सेट करने के लिए 1s देरी शेड्यूल करता हूं जिसका अर्थ है कि जब कोई अन्य मूल्य आता है तो यह नए .distinctUntilChanged() के साथ एक नया पर्यवेक्षण वापस कर देगा।

  4. फिर filter() मुझे सभी null मूल्यों को अनदेखा करने देगा। इसका मतलब है कि यह एक बार एक से अधिक बार एक नया पर्यवेक्षक उत्सर्जित नहीं करेगा।

  5. अब मैं तथाकथित उच्च आदेश observables (observables उत्सर्जक observables के साथ काम करने की जरूरत है। इस कारण से मैं switch() ऑपरेटर कि हमेशा नवीनतम प्रत्यक्ष स्रोत द्वारा उत्सर्जित करने के लिए केवल सब्सक्राइब का उपयोग करें। हमारे मामले में हम observables केवल अधिकतम फेंकना एक बार एक बार (ऊपर दिए गए filter() के लिए धन्यवाद) और यह आंतरिक स्वयं पर्यवेक्षक कई मूल्यों को उत्सर्जित कर सकता है और उन सभी को distinctUntilChanged() के माध्यम से पारित किया जा रहा है, इसलिए डुप्लीकेट को अनदेखा कर दिया जाता है।

इस डेमो के लिए उत्पादन निम्नलिखित उत्पादन तरह दिखेगा:

Timestamp { value: 1, timestamp: 1484670434528 } 
Timestamp { value: 1, timestamp: 1484670436475 } 
Timestamp { value: 2, timestamp: 1484670436577 } 
complete 

आप देख सकते हैं मूल्य 1 सीसीए 2s देरी के साथ दो बार उत्सर्जित होता है। हालांकि 2distinctUntilChanged() पर 100ms धन्यवाद के बाद किसी भी समस्या के बिना पास किया गया।

मैं जानता हूँ कि यह आसान नहीं है, लेकिन मुझे आशा है कि यह समझ में आता है आप :)

+0

अच्छा जवाब देखें - मैं इस USECASE रूप में अच्छी तरह पसंद किया - एक कस्टम ऑपरेटर बनाने के लिए भी लायक हो सकता है - मैं कल्पना कर सकता से अधिक लोगों को इस मुद्दे के लिए जा रहे हैं। – olsn

संबंधित मुद्दे