2015-06-24 12 views
10

भीतर से फोन कर सदस्यता समाप्त मुझे आश्चर्य है कि क्या यह उस तरह onNext हैंडलर के भीतर से unsubscribe कॉल करने के लिए कानूनी है:RxJava: onNext

List<Integer> gatheredItems = new ArrayList<>(); 

Subscriber<Integer> subscriber = new Subscriber<Integer>() { 
    public void onNext(Integer item) { 
     gatheredItems.add(item); 
     if (item == 3) { 
      unsubscribe(); 
     } 
    } 
    public void onCompleted() { 
     // noop 
    } 
    public void onError(Throwable sourceError) { 
     // noop 
    } 
}; 

Observable<Integer> source = Observable.range(0,100); 

source.subscribe(subscriber); 
sleep(1000); 
System.out.println(gatheredItems); 

ऊपर कोड सही तरीके से आउटपुट कि सिर्फ चार तत्वों को इकट्ठा किया गया: [0, 1, 2, 3]। लेकिन किसी स्रोत नमूदार कैश हो जाने को परिवर्तित करता है:

Observable<Integer> source = Observable.range(0,100).cache(); 

तब सभी सौ तत्वों इकट्ठे हुए हैं। मेरे पास स्रोत पर नियंत्रण नहीं है (चाहे वह कैश किया गया हो या नहीं), तो onNext के भीतर से निश्चित रूप से सदस्यता कैसे लें?

बीटीडब्ल्यू: तो onNext गलत काम करने के भीतर सदस्यता रद्द कर रहा है?

(मेरे वास्तविक उपयोग के मामले कि onNext में मैं वास्तव में उत्पादन धारा लिए लिख रहा हूँ है, और एक IOException तब होता है जब ज्यादा कुछ नहीं उत्पादन के लिए लिखा जा सकता है तो मैं किसी भी तरह आगे की प्रक्रिया को रोकने के लिए की जरूरत है।)

उत्तर

3

कैश() पहले ग्राहक आने के बाद सबकुछ कैश करता है और यह डाउनस्ट्रीम से इसे रोकने के लिए कोई विकल्प नहीं देता है। आप) कैश (इससे पहले कि धारा को रोकने के लिए बहुत ज्यादा प्रतिधारण से बचने की जरूरत:

Observable<Integer> source = Observable.range(1, 100); 

PublishSubject<Integer> stop = PublishSubject.create(); 

source 
.doOnNext(v -> System.out.println("Generating " + v)) 
.takeUntil(stop) 
.cache() 
.doOnNext(new Action1<Integer>() { 
    int calls; 
    @Override 
    public void call(Integer t) { 
     System.out.println("Saving " + t); 
     if (++calls == 3) { 
      stop.onNext(1); 
     } 
    } 
}) 
.subscribe(); 

संपादित करें: उपरोक्त उदाहरण 1.0.13 इसलिए यहाँ नीचे काम नहीं करता है एक संस्करण है जो करना चाहिए:

SerialSubscription ssub = new SerialSubscription(); 

ConnectableObservable<Integer> co = source 
     .doOnNext(v -> System.out.println("Generating " + v)) 
     .replay(); 

co.doOnNext(v -> { 
    System.out.println("Saving " + v); 
    if (v == 3) { 
     ssub.unsubscribe(); 
    } 
}) 
.subscribe(); 

co.connect(v -> ssub.set(v)); 
+0

यह कोड अपेक्षित काम नहीं करता है, यह सौ बार "जेनरेटिंग" प्रिंट करता है जिसके बाद सौ गुना "सेविंग" होता है। मैं केवल चार बार "सेविंग" की उम्मीद करता हूं। –

+0

मैंने इसे पोस्ट करने से पहले अपना उदाहरण चलाया है, इसलिए यह प्रत्येक 3 लाइनों को प्रिंट करता है। क्या आपने इसे क्रियान्वित करने की कोशिश की है या आपने इसे अपने प्रवाह में डालने का प्रयास किया है? – akarnokd

+0

मैंने आपके उदाहरण वर्बैटिम का उपयोग किया, बस मुख्य और आयात जोड़ा। मैंने इसे दो अलग-अलग कंप्यूटरों पर जावरक्स 1.0.12 पर चलाया था, और यह हमेशा "बचत" को सौ बार प्रिंट करता है। –