यहां आप जो चाहते हैं उसे करने के लिए एक उचित सरल आरएक्स तरीका है। मैंने Pausable
नामक एक एक्सटेंशन विधि बनाई है जो एक स्रोत देखने योग्य और boolean
का दूसरा अवलोकन योग्य है जो अवलोकन योग्य को रोकता है या फिर से शुरू करता है।
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
यह इस तरह इस्तेमाल किया जा सकता:
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
अब, केवल एक चीज मैं काफी बाहर काम नहीं कर सकता है आप अपने से क्या मतलब है "घटनाओं में से भेजे धारा एक IObserver<T>
है।" स्ट्रीम IObservable<T>
हैं। पर्यवेक्षक धारा नहीं हैं। ऐसा लगता है कि आप यहां कुछ नहीं कर रहे हैं। क्या आप अपने प्रश्न में जोड़ सकते हैं और कृपया आगे बता सकते हैं?
public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
var queue = source.Buffer(pauser.Where(toPause => !toPause),
_ => pauser.Where(toPause => toPause))
.SelectMany(l => l.ToObservable());
return source.Window(pauser.Where(toPause => toPause).StartWith(true),
_ => pauser.Where(toPause => !toPause))
.Switch()
.Merge(queue);
}
विंडो सदस्यता और हर बार की सच्ची pauser धारा से प्राप्त होता है पर खोला जाता है:
में सोच रही थी कि अगर उत्पादन रोक दिया गया है, एक आंतरिक कतार में घटनाओं अनुप्रेषित सकता है, और जब outout रोक दिया गया है, यह कतार बाहर फ्लश कर सकते हैं। – Contango
आपने अपना खुद का 'IObserver' लागू नहीं किया है, है ना? –
Enigmativity
नहीं, मैंने जो कुछ किया है वह आंतरिक 'विषय' को 'IObserver ' पर डाला गया है, इसलिए विधि 'ओएनएक्स्ट' का खुलासा किया जा सकता है। –
Contango