2017-03-31 10 views
7

उपयोग का मामला यह है: मैं अस्थायी रूप से नवीनतम उत्सर्जित महंगी अवलोकन प्रतिक्रिया को कैश करना चाहता हूं, लेकिन इसकी समाप्ति के बाद, महंगे स्रोत पर लौटें और इसे फिर से कैश करें।एक अस्थायी कैश के साथ रिमोट ऑब्जर्जेबल का अनुरोध करने के लिए आरएक्सजेवा पैटर्न

एक सुंदर मूल नेटवर्क कैश परिदृश्य, लेकिन मैं इसे काम करने के लिए वास्तव में संघर्ष कर रहा हूं।

private Observable<String> getContentObservable() { 

    // expensive upstream source (API, etc.) 
    Observable<String> sourceObservable = getSourceObservable(); 

    // cache 1 result for 30 seconds, then return to the source 
    return sourceObservable 
      .replay(1, 30, TimeUnit.SECONDS) 
      .autoConnect() 
      .switchIfEmpty(sourceObservable); 
} 

प्रारंभिक अनुरोध: कैश समाप्ति विंडो के बाहर कैश तीसरा अनुरोध से दिया: कुछ भी नहीं स्रोतों में उत्सर्जन के 30 सेकंड के भीतर दूसरा अनुरोध चला जाता है। मैं इसकी सदस्यता लेता हूं और मुझे कोई डेटा नहीं मिलता है, लेकिन यह अपस्ट्रीम स्रोत ऑब्जर्जेबल पर स्विच नहीं कर रहा है।

ऐसा लगता है कि मैं सिर्फ autoConnect() से अपने कनेक्ट करने योग्य ऑब्सर्जेबल से कनेक्ट कर रहा हूं और यह कभी खाली नहीं हो रहा है, इसलिए यह कभी भी मेरे switchIfEmpty() को ट्रिगर नहीं कर रहा है।

मैं replay(1,x,x) और switchIfEmpty() के इस संयोजन का उपयोग कैसे कर सकता हूं?

या क्या मैं शुरुआत से ही इस गलत से संपर्क कर रहा हूं?

+0

नील, यह महत्वपूर्ण नहीं हो सकता है लेकिन यह मेरे लिए ऐसा लगता है: क्या आपका मूल 'स्रोत ऑब्सर्वेबल' "गर्म" या "ठंडा" है? (मैं दूसरा उम्मीद करता हूं लेकिन सत्यापित करना चाहता हूं)। क्या आप वास्तव में "अनुरोध करने" के लिए उपयोग किए जाने वाले कोड को दिखा सकते हैं यानी आप 'getContentObservable' – SergGr

+0

@SergGr के परिणाम का उपयोग कैसे करते हैं, स्रोत कम हो जाएगा, मेरी कम अमूर्त स्थिति में यह केवल एक रेट्रोफिट सेवा API कॉल है। तो वास्तविक कोड सिर्फ 'getContentObservable() 'के अंदर रेट्रोफिट से एक अवलोकन योग्य हो रहा है –

उत्तर

1

तो यह पता चला है कि आप जेक व्हार्टन के रीप्लेइंग शेयर का उपयोग निपटाने के बाद भी अंतिम मूल्य को कैश करने के लिए कर सकते हैं। https://github.com/JakeWharton/RxReplayingShare

1

आपको कई कॉलर्स के बीच साझा एक राज्य बनाए रखना होगा। यही कारण है कि आप हर बार अवलोकन करने योग्य नहीं बना सकते हैं CONTentObservable() कहा जाता है।

ऐसा करने का एक तरीका है अवलोकन करने योग्य (बाहरी बफर का उपयोग करके) में आंतरिक स्थिति को पकड़ना, लेकिन राज्यव्यापी व्यवहार को लागू करना अक्सर पर्यवेक्षकों के बिना आसान होता है।

private Optional<String> cached = Optional.empty(); 

private Observable<String> getContentObservable() { 
    //use defer to delay cache evaluation to the point when someone subscribes 
    return Observable.defer(
     () -> 
      cached.isPresent() 
       ? Observable.just(cached) 
       : fetchAndCache() 
    ) 
    //use the same scheduler for every cached field access 
    .subscribeOn(scheduler); 
} 

private Observable<String> fetchAndCache() { 
    Observable<String> cachedSource = getSourceObservable() 
     //I assume you only need one, make sure it is 1 
     .take(1) 
     .cache(); 

    cachedSource 
     .observeOn(scheduler) 
     //side-effect stores the state 
     .doOnNext(str -> cached = Optional.of(str)) 
     .flatMap(str -> Observable.timer(30, TimeUnit.SECONDS, scheduler)) 
     //another side-effect clears the cache 
     .subscribe(l -> cached = Optional.empty()); 

    return cachedSource;   
} 
3
return sourceObservable 
      .replay(1, 30, TimeUnit.SECONDS) 
      .autoConnect() 
      .switchIfEmpty(sourceObservable); 

प्रारंभिक अनुरोध:

यहाँ एक क्षेत्र में साझा राज्य के साथ एक उदाहरण है स्रोत के 30 सेकंड के भीतर स्रोत के लिए दूसरा अनुरोध चला जाता है उत्सर्जन: कैश के बाहर कैश तीसरा अनुरोध से वितरित समाप्ति खिड़की: कुछ भी नहीं। मैं इसकी सदस्यता लेता हूं और मुझे कोई डेटा नहीं मिलता है, लेकिन यह अपस्ट्रीम स्रोत ऑब्जर्जेबल पर स्विच नहीं कर रहा है।

समस्या यहाँ है, कि पुनरावृत्ति अभी पिछले 30 सेकंड में sourceObservable द्वारा उत्सर्जित उसी क्रम दोहरा, लेकिन जब आप 30 सेकंड के बाद सदस्यता लेते हैं, अनुक्रम कोई इवेंट, यहां तक ​​कि कोई onCompleted() है, तो आप कर सकते हैं ' टी switchIfEmpty(), यह काम नहीं करेगा क्योंकि यह 'onCompleted()' सिग्नल पर निर्भर करता है और बिना किसी उत्सर्जन के, यह जानने के लिए कि यह 'खाली' है।

सामान्य रूप से, कैप्चर परिदृश्य में रीप्ले का उपयोग पर्याप्त नहीं है, क्योंकि आपको कैश की अवधि समाप्त होने के बाद फिर से सदस्यता लेने का एक तरीका है, और अतिरिक्त रूप से मांग के अनुसार ऐसा करना है, जिसका अर्थ है कि कुछ क्लाइंट इसकी सदस्यता लेते हैं। (आप कैश है जो अपने आप हर 30 सेकंड से ताज़ा कर सकते हैं, लेकिन यह है कि वांछित व्यवहार मुझे लगता है कि नहीं है)


तो, के रूप में @Yurly Kulikov सुझाव दिया, आप एक राज्य बनाए रखने के लिए, और के लिए सदस्यता आपरेशन नियंत्रित करने के लिए की जरूरत है राज्य को बनाए रखना लेकिन मुझे लगता है कि समाधान में एक बड़ा प्रवाह है, क्योंकि यह वास्तव में थ्रेड-सुरक्षित नहीं है, जिसका अर्थ है कि यदि 2 दूसरे के बाद 1 की सदस्यता लेता है, तो ए और बी कहें, जबकि ए अनुरोध निष्पादित करता है और सहेजने के लिए प्रतीक्षा करता है कैश में नया परिणाम, बी सब्सक्राइब भी कर सकता है, और एक अन्य अनुरोध निष्पादित किया जाएगा क्योंकि कैश किए गए मान को अभी तक सेट नहीं किया गया है (यह अभी तक पहला नेटवर्क अनुरोध समाप्त नहीं हुआ है।

मैं एक अलग कार्यान्वयन के साथ समान दृष्टिकोण का उपयोग करने का सुझाव देते हैं, कि मैं सुझाव here:

public class CachedRequest<T> { 

private final AtomicBoolean expired = new AtomicBoolean(true); 
private final Observable<T> source; 
private final long cacheExpirationInterval; 
private final TimeUnit cacheExpirationUnit; 
private Observable<T> current; 

    public CachedRequest(Observable<T> o, long cacheExpirationInterval, 
         TimeUnit cacheExpirationUnit) { 
     source = o; 
     current = o; 
     this.cacheExpirationInterval = cacheExpirationInterval; 
     this.cacheExpirationUnit = cacheExpirationUnit; 
    } 

    private Observable<T> getCachedObservable() { 
     return Observable.defer(() -> { 
      if (expired.compareAndSet(true, false)) { 
       current = source.cache(); 
       Observable.timer(cacheExpirationInterval, cacheExpirationUnit)       
         .subscribe(aLong -> expired.set(true)); 
      } 
      return current; 
     }); 
    } 
} 

आस्थगित करें साथ आप कैश समाप्ति स्थिति के अनुसार सही प्रत्यक्ष लौट सकते हैं, इसलिए हर कैश समाप्ति के अंदर हुए सदस्यता होगा कैश अवलोकन (कैश() का उपयोग करके) - जिसका अर्थ केवल एक बार किया जाएगा। कैश की समाप्ति के बाद, अतिरिक्त सदस्यता नए अनुरोध को ट्रिगर करेगी और कैश की समाप्ति को रीसेट करने के लिए एक नया टाइमर सेट करेगा।

संबंधित मुद्दे