return sourceObservable
.replay(1, 30, TimeUnit.SECONDS)
.autoConnect()
.switchIfEmpty(sourceObservable);
प्रारंभिक अनुरोध:
यहाँ एक क्षेत्र में साझा राज्य के साथ एक उदाहरण है स्रोत के 30 सेकंड के भीतर स्रोत के लिए दूसरा अनुरोध चला जाता है उत्सर्जन: कैश के बाहर कैश तीसरा अनुरोध से वितरित समाप्ति खिड़की: कुछ भी नहीं। मैं इसकी सदस्यता लेता हूं और मुझे कोई डेटा नहीं मिलता है, लेकिन यह अपस्ट्रीम स्रोत ऑब्जर्जेबल पर स्विच नहीं कर रहा है।
समस्या यहाँ है, कि पुनरावृत्ति अभी पिछले 30 सेकंड में sourceObservable
द्वारा उत्सर्जित उसी क्रम दोहरा, लेकिन जब आप 30 सेकंड के बाद सदस्यता लेते हैं, अनुक्रम कोई इवेंट, यहां तक कि कोई onCompleted()
है, तो आप कर सकते हैं ' टी switchIfEmpty()
, यह काम नहीं करेगा क्योंकि यह 'onCompleted()'
सिग्नल पर निर्भर करता है और बिना किसी उत्सर्जन के, यह जानने के लिए कि यह 'खाली' है।
सामान्य रूप से, कैप्चर परिदृश्य में रीप्ले का उपयोग पर्याप्त नहीं है, क्योंकि आपको कैश की अवधि समाप्त होने के बाद फिर से सदस्यता लेने का एक तरीका है, और अतिरिक्त रूप से मांग के अनुसार ऐसा करना है, जिसका अर्थ है कि कुछ क्लाइंट इसकी सदस्यता लेते हैं। (आप कैश है जो अपने आप हर 30 सेकंड से ताज़ा कर सकते हैं, लेकिन यह है कि वांछित व्यवहार मुझे लगता है कि नहीं है)
तो, के रूप में @Yurly Kulikov सुझाव दिया, आप एक राज्य बनाए रखने के लिए, और के लिए सदस्यता आपरेशन नियंत्रित करने के लिए की जरूरत है राज्य को बनाए रखना लेकिन मुझे लगता है कि समाधान में एक बड़ा प्रवाह है, क्योंकि यह वास्तव में थ्रेड-सुरक्षित नहीं है, जिसका अर्थ है कि यदि 2 दूसरे के बाद 1 की सदस्यता लेता है, तो ए और बी कहें, जबकि ए अनुरोध निष्पादित करता है और सहेजने के लिए प्रतीक्षा करता है कैश में नया परिणाम, बी सब्सक्राइब भी कर सकता है, और एक अन्य अनुरोध निष्पादित किया जाएगा क्योंकि कैश किए गए मान को अभी तक सेट नहीं किया गया है (यह अभी तक पहला नेटवर्क अनुरोध समाप्त नहीं हुआ है।
मैं एक अलग कार्यान्वयन के साथ समान दृष्टिकोण का उपयोग करने का सुझाव देते हैं, कि मैं सुझाव here:
public class CachedRequest<T> {
private final AtomicBoolean expired = new AtomicBoolean(true);
private final Observable<T> source;
private final long cacheExpirationInterval;
private final TimeUnit cacheExpirationUnit;
private Observable<T> current;
public CachedRequest(Observable<T> o, long cacheExpirationInterval,
TimeUnit cacheExpirationUnit) {
source = o;
current = o;
this.cacheExpirationInterval = cacheExpirationInterval;
this.cacheExpirationUnit = cacheExpirationUnit;
}
private Observable<T> getCachedObservable() {
return Observable.defer(() -> {
if (expired.compareAndSet(true, false)) {
current = source.cache();
Observable.timer(cacheExpirationInterval, cacheExpirationUnit)
.subscribe(aLong -> expired.set(true));
}
return current;
});
}
}
आस्थगित करें साथ आप कैश समाप्ति स्थिति के अनुसार सही प्रत्यक्ष लौट सकते हैं, इसलिए हर कैश समाप्ति के अंदर हुए सदस्यता होगा कैश अवलोकन (कैश() का उपयोग करके) - जिसका अर्थ केवल एक बार किया जाएगा। कैश की समाप्ति के बाद, अतिरिक्त सदस्यता नए अनुरोध को ट्रिगर करेगी और कैश की समाप्ति को रीसेट करने के लिए एक नया टाइमर सेट करेगा।
नील, यह महत्वपूर्ण नहीं हो सकता है लेकिन यह मेरे लिए ऐसा लगता है: क्या आपका मूल 'स्रोत ऑब्सर्वेबल' "गर्म" या "ठंडा" है? (मैं दूसरा उम्मीद करता हूं लेकिन सत्यापित करना चाहता हूं)। क्या आप वास्तव में "अनुरोध करने" के लिए उपयोग किए जाने वाले कोड को दिखा सकते हैं यानी आप 'getContentObservable' – SergGr
@SergGr के परिणाम का उपयोग कैसे करते हैं, स्रोत कम हो जाएगा, मेरी कम अमूर्त स्थिति में यह केवल एक रेट्रोफिट सेवा API कॉल है। तो वास्तविक कोड सिर्फ 'getContentObservable() 'के अंदर रेट्रोफिट से एक अवलोकन योग्य हो रहा है –