मुझे लगता है कि निम्नलिखित है observables बनाना चाहते हैं:,,RXJava - एक pausable नमूदार (बफर और उदाहरण के लिए खिड़की के साथ) बनाने
- बफर सभी आइटम, जबकि वे
- रोक दिए गए हैं तुरंत आइटम फेंकना वे कर रहे हैं, जबकि
- रोकी नहीं रोकें/फिर से शुरू ट्रिगर से आना चाहिए एक और नमूदार
- यह observables कि मुख्य थ्रेड पर नहीं चला द्वारा प्रयोग की जाने बचाने किया जाना चाहिए और यह मुख्य थ्रेड से रोका/फिर से शुरू स्थिति बदलने बचाने होना चाहिए
मैं BehaviorSubject<Boolean>
का उपयोग ट्रिगर के रूप में करना चाहता हूं और इस ट्रिगर को गतिविधि की onResume
और ईवेंट पर जोड़ना चाहता हूं। (कोड उदाहरण जोड़ा जाता है)
प्रश्न
मैं सेटअप कुछ है, लेकिन के रूप में इरादा यह काम नहीं कर रहा है। वाल्व उत्सर्जक नहीं है, जब तक वाल्व सब कुछ काम कर रहा है (हो सकता है -
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
वर्तमान में समस्या है, कि संस्करण 1 ठीक काम करना चाहिए, लेकिन कभी कभी, घटनाओं सिर्फ उत्सर्जित नहीं कर रहे हैं: मैं इसे निम्नलिखित की तरह का उपयोग एक थ्रेडिंग समस्या ...)! समाधान 2 बहुत आसान है और काम करने लगता है, लेकिन मुझे यकीन नहीं है कि यह वास्तव में बेहतर है, मुझे ऐसा नहीं लगता है। मुझे वास्तव में यकीन नहीं है कि समाधान कभी-कभी विफल क्यों हो रहा है, इसलिए मुझे यकीन नहीं है कि समाधान 2 (वर्तमान में मेरे लिए अज्ञात) समस्या हल करता है ...
क्या कोई मुझे बता सकता है कि समस्या क्या हो सकती है या यदि सरल समाधान विश्वसनीय रूप से काम करना चाहिए? या मुझे एक विश्वसनीय समाधान दिखाओ?
कोड
RxValue
https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08
RXPauser कार्यों
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));
// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));
// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
गतिविधि
public class BaseActivity extends AppCompatActivity {
private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);
public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}
public PublishSubject<Boolean> getPauser()
{
return pauser;
}
@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}
@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}
इस सवाल का जवाब करने की कोशिश कर लोगों को कर रहे हैं, अब तक, एक महत्वपूर्ण आवश्यकता लापता बहुत स्पष्ट सवाल में किया जाता है कि: _ " विराम/फिर से शुरू ट्रिगर एक और देखने योग्य "_ से आना चाहिए। वे एक निश्चित समय सारिणी नहीं चाहते हैं। –