2015-07-11 3 views
5

मेरे पास एक कक्षा है जो घटनाओं की एक धारा में ले जाती है, और घटनाओं की एक और धारा को धक्का देती है।प्रतिक्रियाशील एक्सटेंशन (आरएक्स) के साथ, क्या "रोकें" कमांड जोड़ना संभव है?

सभी घटनाएं प्रतिक्रियाशील एक्सटेंशन (आरएक्स) का उपयोग करती हैं। घटनाओं की आने वाली धारा को बाहरी स्रोत से IObserver<T> में .OnNext का उपयोग करके धक्का दिया जाता है, और IObservable<T> और .Subscribe का उपयोग करके ईवेंट की आउटगोइंग स्ट्रीम को धक्का दिया जाता है। मैं दृश्यों के पीछे, इसे प्रबंधित करने के लिए Subject<T> का उपयोग कर रहा हूं।

मुझे आश्चर्य है कि आरएक्स में अस्थायी रूप से आउटपुट को रोकने के लिए कौन सी तकनीकें हैं। इसका मतलब यह होगा कि आने वाली घटनाएं एक आंतरिक कतार में बनेंगी, और जब वे रुक जाएंगे, तो घटनाएं फिर से बहती रहेंगी।

+0

में सोच रही थी कि अगर उत्पादन रोक दिया गया है, एक आंतरिक कतार में घटनाओं अनुप्रेषित सकता है, और जब outout रोक दिया गया है, यह कतार बाहर फ्लश कर सकते हैं। – Contango

+0

आपने अपना खुद का 'IObserver ' लागू नहीं किया है, है ना? – Enigmativity

+0

नहीं, मैंने जो कुछ किया है वह आंतरिक 'विषय ' को 'IObserver ' पर डाला गया है, इसलिए विधि 'ओएनएक्स्ट' का खुलासा किया जा सकता है। – Contango

उत्तर

1

आप Observable के साथ रुकने/रोकना अनुकरण कर सकते हैं।

एक बार जब आपका विराम ऑब्जेरबल एक 'रुका हुआ' मान उत्सर्जित करता है, तब तक बफर घटनाएं रोकें जब तक ऑब्सर्वेबल एक 'रुका हुआ' मान उत्सर्जित न करे।

यहाँ एक उदाहरण है जो (एक समय पहले मेरे अपने प्रश्न से) BufferUntil implementation by Dave Sexton और Observable logic by Timothy Shields का उपयोग करता है सुधार के लिए

 //Input events, hot observable 
     var source = Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(i => i.ToString()) 
      .Publish().RefCount(); 

     //Simulate pausing from Keyboard, not actually relevant within this answer 
     var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
      k => KeyPressed += k, k => KeyPressed -= k) 
      .Select(i => i.EventArgs.PressedKey) 
      .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause 
      .DistinctUntilChanged(); 

     //Let events through when not paused 
     var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused}) 
      .Where(i => !i.paused) //not paused 
      .Select(i => i.value) 
      .Subscribe(Console.WriteLine); 

     //When paused, buffer until not paused 
     var pausedEvents = pauseObservable.Where(i => i) 
      .Subscribe(_ => 
       source.BufferUntil(pauseObservable.Where(i => !i)) 
        .Select(i => String.Join(Environment.NewLine, i)) 
        .Subscribe(Console.WriteLine)); 

कक्ष है: शायद एक के रूप में स्रोत के लिए दो सदस्यता विलय (pausedEvents और notPausedEvents)।

2

यहां आप जो चाहते हैं उसे करने के लिए एक उचित सरल आरएक्स तरीका है। मैंने Pausable नामक एक एक्सटेंशन विधि बनाई है जो एक स्रोत देखने योग्य और boolean का दूसरा अवलोकन योग्य है जो अवलोकन योग्य को रोकता है या फिर से शुरू करता है।

public static IObservable<T> Pausable<T>(
    this IObservable<T> source, 
    IObservable<bool> pauser) 
{ 
    return Observable.Create<T>(o => 
    { 
     var paused = new SerialDisposable(); 
     var subscription = Observable.Publish(source, ps => 
     { 
      var values = new ReplaySubject<T>(); 
      Func<bool, IObservable<T>> switcher = b => 
      { 
       if (b) 
       { 
        values.Dispose(); 
        values = new ReplaySubject<T>(); 
        paused.Disposable = ps.Subscribe(values); 
        return Observable.Empty<T>(); 
       } 
       else 
       { 
        return values.Concat(ps); 
       } 
      }; 

      return pauser.StartWith(false).DistinctUntilChanged() 
       .Select(p => switcher(p)) 
       .Switch(); 
     }).Subscribe(o); 
     return new CompositeDisposable(subscription, paused); 
    }); 
} 

यह इस तरह इस्तेमाल किया जा सकता:

var xs = Observable.Generate(
    0, 
    x => x < 100, 
    x => x + 1, 
    x => x, 
    x => TimeSpan.FromSeconds(0.1)); 

var bs = new Subject<bool>(); 

var pxs = xs.Pausable(bs); 

pxs.Subscribe(x => { /* Do stuff */ }); 

Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 
Thread.Sleep(500); 
bs.OnNext(true); 
Thread.Sleep(5000); 
bs.OnNext(false); 

अब, केवल एक चीज मैं काफी बाहर काम नहीं कर सकता है आप अपने से क्या मतलब है "घटनाओं में से भेजे धारा एक IObserver<T> है।" स्ट्रीम IObservable<T> हैं। पर्यवेक्षक धारा नहीं हैं। ऐसा लगता है कि आप यहां कुछ नहीं कर रहे हैं। क्या आप अपने प्रश्न में जोड़ सकते हैं और कृपया आगे बता सकते हैं?

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser) 
{ 
    var queue = source.Buffer(pauser.Where(toPause => !toPause), 
           _ => pauser.Where(toPause => toPause)) 
         .SelectMany(l => l.ToObservable()); 

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
         _ => pauser.Where(toPause => !toPause)) 
       .Switch() 
       .Merge(queue); 
} 

विंडो सदस्यता और हर बार की सच्ची pauser धारा से प्राप्त होता है पर खोला जाता है:

+0

आपके उत्तर के लिए धन्यवाद। मैंने डेटाफ्लो को स्पष्ट बनाने के लिए अपना प्रश्न अपडेट कर दिया है। – Contango

+0

जैसा कि यह स्पष्ट रूप से पहले मानों से नहीं निकलता है, क्योंकि यह झूठी शाखा के माध्यम से जाता है और 'मूल्य' को जोड़ना चाहता है (जो प्रभावी रूप से 'पर्यवेक्षक' है। मुझे लगता है?)। मैंने पहली बार 'मूल्यों' को शुरू करने और दोनों शाखाओं में जांच करके इसे आकार में हैक किया है। यकीन नहीं है कि क्या कुछ और सुरुचिपूर्ण है। – Benjol

+0

भविष्य के लिए आगे चेतावनी। 'ReplaySubject' शैतान का स्पॉन है। यदि आप इसे सीमित नहीं करते हैं (बफर आकार या समय के द्वारा) यह किसी भी चीज को सब्सक्राइब करने के साथ आता है, तो वह जो कुछ भी देखता है, उस पर लटका होगा। – Benjol

4

यहाँ मेरी समाधान बफर और खिड़की ऑपरेटरों का उपयोग कर रहा है। यह बंद हो जाता है जब पॉसर 'झूठा' मान प्रदान करता है।

बफर जो करता है वह करता है, बफर मूल्य जो 'झूठी' और 'सत्य' के बीच हैं। एक बार बफर को 'सत्य' प्राप्त हो जाने पर यह उन मूल्यों के आईएलआईस्ट को आउटपुट करता है जो तुरंत एक बार में स्ट्रीम होते हैं।

DotNetFiddle लिंक: https://dotnetfiddle.net/vGU5dJ

+0

आपको शायद 'पॉजर' बनाना होगा। प्रकाशित करें (ps => {...}) 'और अपने कोड में 'ps' के साथ' पॉसर 'को प्रतिस्थापित करें अन्यथा आप' पॉसर 'में चार सदस्यता बना रहे हैं और स्रोत के आधार पर 'पॉसर' का जो विधि को विफल कर सकता है। – Enigmativity

+0

हां, मैंने अभी पुष्टि की है। आप कई सदस्यता बना रहे हैं। – Enigmativity

+0

मैंने https://dotnetfiddle.net/qWN72d – Enigmativity

संबंधित मुद्दे