2016-09-25 8 views
16

मुझे लगता है कि निम्नलिखित है observables बनाना चाहते हैं:,,RXJava - एक pausable नमूदार (बफर और उदाहरण के लिए खिड़की के साथ) बनाने

  • बफर सभी आइटम, जबकि वे
  • रोक दिए गए हैं तुरंत आइटम फेंकना वे कर रहे हैं, जबकि
  • रोकी नहीं रोकें/फिर से शुरू ट्रिगर से आना चाहिए एक और नमूदार
  • यह observables कि मुख्य थ्रेड पर नहीं चला द्वारा प्रयोग की जाने बचाने किया जाना चाहिए और यह मुख्य थ्रेड से रोका/फिर से शुरू स्थिति बदलने बचाने होना चाहिए

मैं BehaviorSubject<Boolean> का उपयोग ट्रिगर के रूप में करना चाहता हूं और इस ट्रिगर को गतिविधि की onResume और ईवेंट पर जोड़ना चाहता हूं। (कोड उदाहरण जोड़ा जाता है)

प्रश्न

मैं सेटअप कुछ है, लेकिन के रूप में इरादा यह काम नहीं कर रहा है। वाल्व उत्सर्जक नहीं है, जब तक वाल्व सब कुछ काम कर रहा है (हो सकता है -

Observable o = ...; 
// Variant 1 
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) 
// Variant 2 
// o = o.compose(RXPauser.applyPauser(getPauser())); 
o 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(); 

वर्तमान में समस्या है, कि संस्करण 1 ठीक काम करना चाहिए, लेकिन कभी कभी, घटनाओं सिर्फ उत्सर्जित नहीं कर रहे हैं: मैं इसे निम्नलिखित की तरह का उपयोग एक थ्रेडिंग समस्या ...)! समाधान 2 बहुत आसान है और काम करने लगता है, लेकिन मुझे यकीन नहीं है कि यह वास्तव में बेहतर है, मुझे ऐसा नहीं लगता है। मुझे वास्तव में यकीन नहीं है कि समाधान कभी-कभी विफल क्यों हो रहा है, इसलिए मुझे यकीन नहीं है कि समाधान 2 (वर्तमान में मेरे लिए अज्ञात) समस्या हल करता है ...

क्या कोई मुझे बता सकता है कि समस्या क्या हो सकती है या यदि सरल समाधान विश्वसनीय रूप से काम करना चाहिए? या मुझे एक विश्वसनीय समाधान दिखाओ?

कोड

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser कार्यों

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser) 
{ 
    return observable -> pauser(observable, pauser); 
} 

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser) 
{ 
    // this observable buffers all items that are emitted while emission is paused 
    Observable<T> sharedSource = source.publish().refCount(); 
    Observable<T> queue = sharedSource 
      .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed)) 
      .flatMap(l -> Observable.from(l)) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t)); 

    // this observable emits all items that are emitted while emission is not paused 
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed)) 
      .switchMap(tObservable -> tObservable) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t)); 

    // combine both observables 
    return queue.mergeWith(window) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t)); 
} 

गतिविधि

public class BaseActivity extends AppCompatActivity { 

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false); 

    public BaseActivity(Bundle savedInstanceState) 
    { 
     super(args); 
     final Class<?> clazz = this.getClass(); 
     pauser 
       .doOnUnsubscribe(() -> { 
        L.d(clazz, "Pauser unsubscribed!"); 
       }) 
       .subscribe(aBoolean -> { 
        L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED")); 
       }); 
    } 

    public PublishSubject<Boolean> getPauser() 
    { 
     return pauser; 
    } 

    @Override 
    protected void onResume() 
    { 
     super.onResume(); 
     pauser.onNext(true); 
    } 

    @Override 
    protected void onPause() 
    { 
     pauser.onNext(false); 
     super.onPause(); 
    } 
} 
+0

इस सवाल का जवाब करने की कोशिश कर लोगों को कर रहे हैं, अब तक, एक महत्वपूर्ण आवश्यकता लापता बहुत स्पष्ट सवाल में किया जाता है कि: _ " विराम/फिर से शुरू ट्रिगर एक और देखने योग्य "_ से आना चाहिए। वे एक निश्चित समय सारिणी नहीं चाहते हैं। –

उत्तर

3

आप वास्तव में .buffer() ऑपरेटर का उपयोग कर सकते हैं, यह नमूदार गुजर परिभाषित करने की किताब से बफरिंग, नमूना को रोकने के लिए जब: https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10) 
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) 
    .subscribe(System.out::println); 
अध्याय 5 से

, 'अनुक्रम Taming' आप अपने कस्टम ऑपरेटर में तत्वों को खिलाने के लिए PublishSubjectObservable के रूप में उपयोग कर सकते हैं। प्रत्येक बार जब आपको बफरिंग शुरू करने की आवश्यकता होती है, तो Observable.defer(() -> createBufferingValve())

2

द्वारा उदाहरण बनाएं, मैंने ईवेंट लॉगिंग के लिए समान चीज़ बनाई है।
विषय कुछ घटनाएं एकत्र करता है, और 10 सेकंड में एक बार उन्हें सर्वर पर धक्का देता है।

मुख्य विचार है, उदाहरण के लिए आपके पास कक्षा Event है।

public class Event { 

    public String jsonData; 

    public String getJsonData() { 
     return jsonData; 
    } 

    public Event setJsonData(String jsonData) { 
     this.jsonData = jsonData; 
     return this; 
    } 
} 

आप घटनाओं के लिए कतार बनाना चाहिए:

private PublishSubject<Event> eventQueue = PublishSubject.create(); 

यह BehaviorSubject हो सकता है, यह कोई बात नहीं

तो फिर तुम, तर्क बनाना चाहिए जो सर्वर के लिए घटनाओं धक्का संभाल लेंगे :

eventObservable = eventQueue.asObservable() 
      .buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds 
      .toList() 
      .doOnNext(new Action1<List<Event>>() { 
       @Override 
       public void call(List<Event> events) { 
        apiClient.pushEvents(events);  //push your event 
       } 
      }) 
      .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() { 
       @Override 
       public Observable<List<Event>> call(Throwable throwable) { 
        return null; //make sure, that on error will be never called 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(Schedulers.io()); 

तब आपको इसकी सदस्यता लेनी चाहिए, और subsc को बनाए रखना चाहिए ription, जब तक आप इसे ज़रूरत नहीं है:

eventSubscription = eventObservable.subscribe() 

होम इस मदद करता है

संबंधित मुद्दे