मैं आरएक्सजेवा के लिए नया हूं, लेकिन किसी और ने आपके प्रश्न का उत्तर नहीं दिया है, इसलिए यहां पर एक स्टैब है।
मेरा सुझाव आपके निर्माता वर्ग के लिए इंटरफ़ेस Observable.OnSubscribeFunc
(Javadoc) लागू करने के लिए है। विधि public Subscription onSubscribe(Observer<T> observer)
में, आप क्या डेटा उपलब्ध नहीं है मिलता है और इस तरह onNext()
विधि में है कि डेटा पारित करने के लिए अपने Producer#readData()
विधि कॉल कर सकते हैं:
// Make sure to change T as appropriate
public class Producer implements OnSubscribeFunc<T> {
. . .
@Override
public Subscription onSubscribe(Observer<T> observer) {
while (this.hasData()) {
observer.onNext(this.readData());
observer.onCompleted();
}
}
(मतलब यह है कि मात्रा में अपने Producer#readData()
विधि रिटर्न डेटा के बजाय ।। सभी को एक साथ ट्वीक रूप में की जरूरत)
फिर आप Observable#create(OnSubscribeFunc<T>)
पद्धति का उपयोग करके अपने निर्माता वस्तु की सदस्यता ले सकते हैं, और भी एक टाइमर पर अपने प्रत्यक्ष वापसी तत्वों की व्यवस्था:
// In your observer class
Observable<T> myProducer = Observable.create(new Producer(...));
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....);
Observable<T> myProducerTimed = myProducer.subscribeOn(Schedulers.executor(scheduler));
// Now use myProducerTimed
मैं अभी उन अंतिम दो पंक्तियों का परीक्षण नहीं कर सकता। क्षमा करें, मैं उसमें देख लूंगा और अगर यह चीजों को बदलता है तो मेरा जवाब अपडेट करें।
अस्वीकरण: मैं एक आरएक्सजेवा एन 00 बी हूं, इसलिए यह समाधान बेकार हो सकता है या शायद गन्दा हो सकता है। क्या गलत लगता है ठीक करें।
अद्यतन: मुझे RxJava के साथ एक समस्या (RxJava -- Terminating Infinite Streams) भी थी; ऐसा लगता है कि मेरा समाधान आपके जैसा दिखता है (शेड्यूलर का उपयोग समय-समय पर अवलोकन करने योग्य तत्वों को बनाने के लिए)।
मेरा सुझाव 'सब्सक्राइब() 'में सभी भारी भारोत्तोलन करता है। यह गलत है (एसओ प्रश्न जो मैंने अपने उत्तर से जुड़ा है) देखें, लेकिन इस समय मैं अपना जवाब दोबारा नहीं कर सकता। अगर कोई मेरे सामने आता है तो मैं संपादन का स्वागत करता हूं। – MonkeyWithDarts