2015-10-18 10 views
7

हम कैश() ऑपरेटर का उपयोग एक लंबे कार्य (http अनुरोध) कई बार क्रियान्वित करने से बचने के लिए कर सकते हैं, और उसके परिणाम का पुन: उपयोग:नमूदार, त्रुटि और कैश पर पुन: प्रयास केवल तभी पूरा

Observable apiCall = createApiCallObservable().cache(); // notice the .cache() 

--------------------------------------------- 
// the first time we need it 
apiCall.andSomeOtherStuff() 
       .subscribe(subscriberA); 

--------------------------------------------- 
//in the future when we need it again 
apiCall.andSomeDifferentStuff() 
       .subscribe(subscriberB); 

पहली बार, http अनुरोध निष्पादित किया गया है, लेकिन दूसरी बार, क्योंकि हमने कैश() ऑपरेटर का उपयोग किया था, अनुरोध निष्पादित नहीं किया जाएगा, लेकिन हम पहले परिणाम का पुन: उपयोग करने में सक्षम होंगे।

यह पहला काम करता है जब पहला अनुरोध सफलतापूर्वक पूरा हो जाता है। लेकिन अगर पहले प्रयास में ऑनर को बुलाया जाता है, तो अगली बार जब कोई नया ग्राहक एक ही अवलोकन करने के लिए सदस्यता लेता है, तो ऑनर ​​को दोबारा http अनुरोध का प्रयास किए बिना फिर से बुलाया जाएगा।

हम जो करने की कोशिश कर रहे हैं वह यह है कि अगर अगली बार पहली बार कॉल किया जाता है, तो अगली बार जब कोई एक ही देखने योग्य की सदस्यता लेता है, तो http अनुरोध को स्क्रैच से प्रयास किया जाएगा। यानी अवलोकन योग्य केवल सफल एपीआई कॉल को कैश करेगा, यानी जिनके लिए पूर्ण पर कॉल किया गया था।

आगे बढ़ने के बारे में कोई विचार? हमने पुनः प्रयास() और कैश() ऑपरेटर का उपयोग करके बहुत भाग्यशाली नहीं किया है।

उत्तर

4

यह समाधान हम साथ समाप्त हो गया, akarnokd के समाधान का विस्तार करने के बाद है:

public class OnErrorRetryCache<T> { 

    public static <T> Observable<T> from(Observable<T> source) { 
     return new OnErrorRetryCache<>(source).deferred; 
    } 

    private final Observable<T> deferred; 
    private final Semaphore singlePermit = new Semaphore(1); 

    private Observable<T> cache = null; 
    private Observable<T> inProgress = null; 

    private OnErrorRetryCache(Observable<T> source) { 
     deferred = Observable.defer(() -> createWhenObserverSubscribes(source)); 
    } 

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    { 
     singlePermit.acquireUninterruptibly(); 

     Observable<T> cached = cache; 
     if (cached != null) { 
      singlePermit.release(); 
      return cached; 
     } 

     inProgress = source 
       .doOnCompleted(this::onSuccess) 
       .doOnTerminate(this::onTermination) 
       .replay() 
       .autoConnect(); 

     return inProgress; 
    } 

    private void onSuccess() { 
     cache = inProgress; 
    } 

    private void onTermination() { 
     inProgress = null; 
     singlePermit.release(); 
    } 
} 

हम रेट्रोफिट से एक http अनुरोध का परिणाम कैश करने के लिए की जरूरत है। तो यह एक अवलोकन के साथ बनाया गया था जो एक ही आइटम को दिमाग में छोड़ देता है।

यदि http अनुरोध को निष्पादित करते समय एक पर्यवेक्षक सदस्यता लेता है, तो हम चाहते थे कि यह प्रतीक्षा करें और दो बार अनुरोध निष्पादित न करें, जब तक कि प्रगति में कोई असफल न हो। ऐसा करने के लिए सेमफोर ब्लॉक को एकल पहुंच की अनुमति देता है जो कैश किए गए अवलोकन योग्य बनाता है या देता है, और यदि कोई नया अवलोकन योग्य बनाया जाता है, तो हम उस तक प्रतीक्षा करते हैं जब तक कि वह समाप्त नहीं हो जाता। उपरोक्त के लिए टेस्ट here

3

आपको कुछ राज्य-संचालन करना है। यहां बताया गया है कि मैं यह कैसे करूं:

public class CachedRetry { 

    public static final class OnErrorRetryCache<T> { 
     final AtomicReference<Observable<T>> cached = 
       new AtomicReference<>(); 

     final Observable<T> result; 

     public OnErrorRetryCache(Observable<T> source) { 
      result = Observable.defer(() -> { 
       for (;;) { 
        Observable<T> conn = cached.get(); 
        if (conn != null) { 
         return conn; 
        } 
        Observable<T> next = source 
          .doOnError(e -> cached.set(null)) 
          .replay() 
          .autoConnect(); 

        if (cached.compareAndSet(null, next)) { 
         return next; 
        } 
       } 
      }); 
     } 

     public Observable<T> get() { 
      return result; 
     } 
    } 

    public static void main(String[] args) { 
     AtomicInteger calls = new AtomicInteger(); 
     Observable<Integer> source = Observable 
       .just(1) 
       .doOnSubscribe(() -> 
        System.out.println("Subscriptions: " + (1 + calls.get()))) 
       .flatMap(v -> { 
        if (calls.getAndIncrement() == 0) { 
         return Observable.error(new RuntimeException()); 
        } 
        return Observable.just(42); 
       }); 

     Observable<Integer> o = new OnErrorRetryCache<>(source).get(); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 
    } 
} 

यह पूरी तरह से सफल स्रोत को कैशिंग करके काम करता है और इसे सभी को देता है। अन्यथा, एक (आंशिक रूप से) असफल स्रोत कैश को क्रिएट करेगा और अगली कॉल पर्यवेक्षक एक पुनर्वितरण को ट्रिगर करेगा।

+0

धन्यवाद akarnokd, यह अच्छा लग रहा है। मुझे केवल कुछ समस्याएं हैं जब स्रोत एक लंबे समय तक चलने वाला http अनुरोध (कुछ सेकेंड) है, और दूसरा, तीसरा ग्राहक सब्सक्राइब करता है जबकि पहले भी प्रगति पर है। उस स्थिति में वे सभी असफल हो जाते हैं, और अनुरोध एक से अधिक बार प्रयास नहीं किया जाता है। कैश() ऑपरेटर अलग-अलग व्यवहार करता है। मैं इसके लिए और अधिक देखूंगा, और उस मुद्दे को दोहराने का प्रयास करूंगा जिसका उल्लेख मैं आपके उदाहरण का उपयोग कर रहा हूं, और जल्द ही आपसे संपर्क करूँगा। – Plato

0

क्या आपने नेटवर्क अनुरोध के लिए कैश को लागू करने के लिए AsyncSubject का उपयोग करने पर विचार किया है? मैंने यह जांचने के लिए एक उदाहरण एप्लिकेशन RxApp बनाया है कि यह कैसे काम कर सकता है। मैं नेटवर्क से प्रतिक्रिया प्राप्त करने के लिए एक सिंगलटन मॉडल का उपयोग करता हूं। इससे प्रतिक्रियाओं को कैश करना, एकाधिक टुकड़ों से डेटा तक पहुंच बनाना, लंबित अनुरोध की सदस्यता लेना और स्वचालित यूआई परीक्षणों के लिए नकली डेटा प्रदान करना संभव बनाता है।

+0

अगर हमने एक AsyncSubject का उपयोग किया, और पहला http अनुरोध विफल हुआ, तो सभी भविष्य के ग्राहकों को, फिर से कोशिश की जा रही http कॉल की बजाय त्रुटि के साथ अधिसूचित किया जाएगा, है ना? – Plato

+0

यह सही है लेकिन मॉडल को प्रदान करना चाहिए उदा। रीसेट() विधि है कि यूआई उपयोगकर्ता को त्रुटि दिखाकर त्रुटि को संभालने के बाद कॉल कर सकता है। – pmellaaho

5

ठीक है, किसी के भी अभी भी रुचि रखने के लिए, मुझे लगता है कि मेरे पास आरएक्स के साथ इसे हासिल करने का एक अच्छा तरीका है।

कुंजी नोट एरर रेस्यूमनेक्स्ट पर उपयोग करना है, जो आपको त्रुटि के मामले में ऑब्जर्जेबल को प्रतिस्थापित करने देगा। तो यह कुछ इस तरह दिखना चाहिए:

Observable<Object> apiCall = createApiCallObservable().cache(1); 
//future call 
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { 
    public Observable<? extends Object> call(Throwable throwable) { 
     return createApiCallObservable(); 
     } 
    }); 

इस तरह, यदि पहली कॉल में नाकाम रही है भविष्य कॉल बस इसे (केवल एक बार) को याद होगा।

लेकिन प्रत्येक अन्य कॉलर जो पहले अवलोकन करने का प्रयास करेगा, असफल हो जाएगा और एक अलग अनुरोध करेगा।

आपने मूल अवलोकन करने का संदर्भ दिया है, चलिए बस इसे अपडेट करें।

हां, तो एक आलसी गेटर:

Observable<Object> apiCall; 
private Observable<Object> getCachedApiCall() { 
    if (apiCall == null){ 
     apiCall = createApiCallObservable().cache(1); 
    } 
    return apiCall; 
} 
अब

, एक गेटर कि फिर से प्रयास करेगा अगर पिछले असफ़ल रहा था:

private Observable<Object> getRetryableCachedApiCall() { 
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { 
     public Observable<? extends Object> call(Throwable throwable) { 
      apiCall = null; 
      return getCachedApiCall(); 
     } 
    }); 
} 

कृपया ध्यान दें कि यह केवल हर बार यह है के लिए एक बार फिर से प्रयास करेगा बुलाया।

तो अब अपने कोड कुछ इस तरह दिखेगा:

--------------------------------------------- 
// the first time we need it - this will be without a retry if you want.. 
getCachedApiCall().andSomeOtherStuff() 
       .subscribe(subscriberA); 

--------------------------------------------- 
//in the future when we need it again - for any other call so we will have a retry 
getRetryableCachedApiCall().andSomeDifferentStuff() 
       .subscribe(subscriberB); 
संबंधित मुद्दे