2016-03-11 7 views
20

मेरे पास Observable<<List<Foo>> getFoo() है जो रेट्रोफिट सेवा से बनाया गया है और .getFoo() विधि को कॉल करने के बाद, मुझे इसे एकाधिक सब्सक्राइबर्स के साथ साझा करने की आवश्यकता है। हालांकि .share() विधि को कॉल करना, यह नेटवर्क कॉल को फिर से निष्पादित करने का कारण बनता है। रीप्ले ऑपरेटर या तो काम नहीं करता है। मुझे पता है कि एक संभावित समाधान .cache() हो सकता है, लेकिन मुझे नहीं पता कि यह व्यवहार क्यों होता है।एकाधिक सब्सक्राइबरों के साथ एकल पर्यवेक्षक

Retrofit retrofit = new Retrofit.Builder() 
      .baseUrl(API_URL) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build(); 

    // Create an instance of our GitHub API interface. 

    // Create a call instance for looking up Retrofit contributors. 

    Observable<List<Contributor>> testObservable = retrofit 
      .create(GitHub.class) 
      .contributors("square", "retrofit") 
      .share(); 


    Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable throwable) { 

     } 

     @Override 
     public void onNext(List<Contributor> contributors) { 
      System.out.println(contributors); 
     } 
    }); 

    Subscription subscription2 = testObservable 
      .subscribe(new Subscriber<List<Contributor>>() { 
       @Override 
       public void onCompleted() { 

       } 

       @Override 
       public void onError(Throwable throwable) { 

       } 

       @Override 
       public void onNext(List<Contributor> contributors) { 
        System.out.println(contributors + " -> 2"); 
       } 
      }); 

    subscription1.unsubscribe(); 
    subscription2.unsubscribe(); 

ऊपर दिया गया कोड उपर्युक्त व्यवहार को पुन: उत्पन्न कर सकता है। आप इसे डीबग कर सकते हैं और देख सकते हैं कि प्राप्त सूची एक अलग मेमोरी एड्रेस से संबंधित है।

मैंने कनेक्टेबल ऑब्सर्बल्स को एक संभावित समाधान के रूप में भी देखा है, लेकिन इसके लिए मुझे मूल अवलोकन करने की आवश्यकता है, और प्रत्येक बार जब मैं एक नया सब्सक्राइबर जोड़ना चाहता हूं तो .connect() पर कॉल करना होगा।

.share() के साथ इस प्रकार का व्यवहार रेट्रोफिट 1.9 तक ठीक काम कर रहा था। यह रेट्रोफिट 2 - बीटा पर काम करना बंद कर दिया। मैंने अभी तक रेट्रोफिट 2 रिलीज संस्करण के साथ इसका परीक्षण नहीं किया है, जिसे कुछ घंटे पहले रिलीज़ किया गया था।

संपादित करें: 01/02/2017

भविष्य पाठकों के लिए, मैं एक लेख here मामले के बारे में अधिक समझा लिखा है!

उत्तर

23

आप ConnectedObservable को .share() द्वारा वापस Observable में वापस लौटने के लिए प्रतीत होते हैं (निहित)। आप गर्म और ठंडे अवलोकनों के बीच के अंतर को पढ़ना चाहेंगे।

प्रयास करें

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share(); 

Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable throwable) { 

    } 

    @Override 
    public void onNext(List<Contributor> contributors) { 
     System.out.println(contributors); 
    } 
}); 

Subscription subscription2 = testObservable 
     .subscribe(new Subscriber<List<Contributor>>() { 
      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(Throwable throwable) { 

      } 

      @Override 
      public void onNext(List<Contributor> contributors) { 
       System.out.println(contributors + " -> 2"); 
      } 
     }); 

testObservable.connect(); 
subscription1.unsubscribe(); 
subscription2.unsubscribe(); 

संपादित करें: आप connect() हर बार जब आप एक नई सदस्यता आप केवल नमूदार शुरू करने की जरूरत है चाहता हूँ कॉल करने के लिए की जरूरत नहीं है। मैं तुम्हें replay() इस्तेमाल कर सकते हैं यकीन है कि बाद के सभी ग्राहकों के सभी आइटम मिल बनाने के लिए लगता है

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share() 
     .replay() 
+0

आपके उत्तर के लिए धन्यवाद। बात यह है कि मैं वास्तव में हर बार कॉलिंग से बचना चाहता हूं। क्या आप सुनिश्चित हैं कि रीप्ले ऑपरेटर इस उपयोग के मामले के साथ ठीक काम करेगा? – Pavlos

+0

असल में मैंने इसका परीक्षण किया और यह काम किया। आपके समय और आपके उत्तर के लिए धन्यवाद। सिर्फ इस मुद्दे के लिए मैंने गर्म और ठंडे अवलोकनों के बीच अंतर पढ़ा है लेकिन रेट्रोफिट के साथ नेटवर्क कॉल के साथ इसे पुन: उत्पन्न नहीं कर सका। अगर मैंने एक Observable.just() का उपयोग किया था तो शेयर ऑपरेटर बहुत बढ़िया काम कर रहा था। – Pavlos

30

उत्पादित RxJava डेवलपर डेविड Karnok वापस जांच करते मैं क्या यहाँ पर जा रहा था की पूरी जानकारी के प्रस्ताव करना चाहते हैं के बाद।

share() को publish().refCount() के रूप में परिभाषित किया गया है, i। ई। स्रोत Observable को पहले ConnectableObservable पर publish() द्वारा परिवर्तित किया गया है, लेकिन connect() "मैन्युअल रूप से" कॉल करने के बजाय उस भाग को refCount() द्वारा प्रबंधित किया जाता है। विशेष रूप से, refCountConnectableObservable पर connect() पर कॉल करेगा जब इसे स्वयं पहली सदस्यता प्राप्त होगी; फिर, जब तक कम से कम एक ग्राहक है, तब तक यह सब्सक्राइब रहेगा; और, आखिरकार, जब ग्राहकों की संख्या 0 तक गिर जाती है तो यह ऊपर की सदस्यता रद्द कर देगी। ठंडObservables के साथ, रेट्रोफिट द्वारा लौटाए गए लोगों की तरह, यह किसी भी चल रहे कंप्यूटेशंस को रोक देगा।

यदि इन चक्रों में से एक के बाद दूसरा ग्राहक आता है, तो refCount फिर से connect पर कॉल करेगा और इस प्रकार स्रोत पर्यवेक्षण के लिए एक नई सदस्यता को ट्रिगर करेगा। इस मामले में, यह एक और नेटवर्क अनुरोध ट्रिगर करेगा।

अब, यह आमतौर पर रेट्रोफिट 1 (और वास्तव में this commit से पहले कोई संस्करण) के साथ स्पष्ट नहीं हुआ है, क्योंकि डिफ़ॉल्ट रूप से रेट्रोफिट के इन पुराने संस्करणों ने सभी नेटवर्क अनुरोधों को किसी अन्य थ्रेड में स्थानांतरित कर दिया है।इसका आमतौर पर मतलब था कि आपके सभी subscribe() कॉल तब होंगे जब पहला अनुरोध/Observable अभी भी चल रहा था और इसलिए नया Subscriber एस बस refCount में जोड़ा जाएगा और इसलिए अतिरिक्त अनुरोध/Observables को ट्रिगर नहीं किया जाएगा।

रेट्रोफिट के नए संस्करण, हालांकि, डिफ़ॉल्ट रूप से काम को किसी अन्य थ्रेड पर स्थानांतरित नहीं करते हैं - आपको इसे स्पष्ट रूप से कॉल करके करना है, उदाहरण के लिए, subscribeOn(Schedulers.io())। यदि आप नहीं करते हैं, तो सब कुछ वर्तमान धागे पर ही रहेगा, जिसका अर्थ है कि दूसरा subscribe() केवल Observable के बाद होता है, onCompleted कहलाता है और इसलिए Subscribers सदस्यता समाप्त हो जाने के बाद और सबकुछ बंद हो जाता है। अब, जैसा कि हमने पहले पैराग्राफ में देखा था, जब दूसरा subscribe() कहा जाता है, share() में स्रोत के लिए Subscription और अन्य नेटवर्क अनुरोध को ट्रिगर करने के अलावा कोई विकल्प नहीं है।

तो, रेट्रोफिट 1 से उपयोग किए जाने वाले व्यवहार पर वापस जाने के लिए, बस subscribeOn(Schedulers.io()) जोड़ें।

इसका परिणाम केवल नेटवर्क अनुरोध निष्पादित किया जाना चाहिए - अधिकांश समय। सिद्धांत रूप में, आप अभी भी कई अनुरोध प्राप्त कर सकते हैं (और आप हमेशा रेट्रोफिट 1 के साथ हो सकते हैं), लेकिन केवल तभी जब आपके नेटवर्क अनुरोध बेहद तेज़ होते हैं और/या subscribe() कॉल काफी देरी के साथ होते हैं, तो फिर, पहला अनुरोध है समाप्त हुआ जब दूसरा subscribe() होता है।

इसलिए, डेविड ने या तो cache() का उपयोग करने का सुझाव दिया है (लेकिन इसमें आपके द्वारा उल्लिखित दोष हैं) या replay().autoConnect()। इन release notes के अनुसार, autoConnect केवल refCount की पहली छमाही की तरह काम करता है, या अधिक सटीक, यह refcount (करने के लिए व्यवहार में

समान), है सिवाय इसके कि यह डिस्कनेक्ट नहीं है जब ग्राहकों खो जाते हैं।

इसका मतलब यह अनुरोध केवल ट्रिगर हो जाएगा पहले subscribe() होता है, लेकिन फिर सब बाद में Subscriber सब उत्सर्जित आइटम प्राप्त होगा, चाहे वहाँ थे की परवाह किए बिना, के बीच, 0 सदस्यों की संख्या में किसी भी समय।

+1

आपके विस्तारित स्पष्टीकरण के लिए धन्यवाद। यकीन है कि यह ध्यान रखें, हालांकि मुझे यकीन है कि इस मुद्दे के लिए मेरे पास कुछ आरामदायक समाधान हैं :) – Pavlos

+2

बढ़िया! मैं सिर्फ यह स्पष्टीकरण जोड़ना चाहता था क्योंकि 'रीप्ले', 'शेयर',' प्रकाशित 'इत्यादि काफी जटिल हैं और इससे बाहर के मामलों के विस्तृत स्पष्टीकरणों को लेकर कोई दिक्कत नहीं होती है। –

+2

वास्तव में महान जवाब :) –

संबंधित मुद्दे